• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bramp / build-along / 20044335102

08 Dec 2025 10:05PM UTC coverage: 90.041% (-0.4%) from 90.47%
20044335102

push

github

bramp
Fix subassembly detection when diagrams inside fail during clustering

When SubAssemblyClassifier.build() built diagrams inside a subassembly,
the first diagram would cluster with nearby images and claim them via
_fail_conflicting_candidates(). Subsequent diagram candidates that shared
those images were marked as failed. When the code tried to build those
failed candidates, CandidateFailedError was raised and the entire
subassembly build failed.

Changes:
- Import CandidateFailedError in subassembly_classifier.py
- Skip diagram candidates that already have a failure_reason
- Wrap result.build() in try/except to catch CandidateFailedError and
  continue to next diagram instead of failing the whole subassembly
- Shrink constraint bbox by 3 points when building diagrams inside
  subassemblies to avoid capturing the white border
- Pass constraint_bbox parameter to _build_subassembly_steps()

This fixes page 015 where the subassembly below step 15 wasn't detected.

11 of 13 new or added lines in 1 file covered. (84.62%)

189 existing lines in 7 files now uncovered.

11320 of 12572 relevant lines covered (90.04%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.43
/src/build_a_long/pdf_extract/classifier/steps/step_classifier.py
1
"""
2
Step classifier.
3

4
Purpose
5
-------
6
Identify complete Step structures by combining step_number, parts_list, and diagram
7
elements. A Step represents a single building instruction comprising:
8
- A StepNumber label
9
- An optional PartsList (the parts needed for this step)
10
- A Diagram (the main instruction graphic showing what to build)
11

12
We look for step_numbers and attempt to pair them with nearby parts_lists and
13
identify the appropriate diagram region for each step.
14

15
Debugging
16
---------
17
Set environment variables to aid investigation without code changes:
18

19
- LOG_LEVEL=DEBUG
20
    Enables DEBUG-level logging (if not already configured by caller).
21
"""
22

23
import logging
1✔
24

25
import numpy as np
1✔
26
from scipy.optimize import linear_sum_assignment
1✔
27

28
from build_a_long.pdf_extract.classifier.candidate import Candidate
1✔
29
from build_a_long.pdf_extract.classifier.classification_result import (
1✔
30
    CandidateFailedError,
31
    ClassificationResult,
32
)
33
from build_a_long.pdf_extract.classifier.label_classifier import (
1✔
34
    LabelClassifier,
35
)
36
from build_a_long.pdf_extract.classifier.score import (
1✔
37
    Score,
38
    Weight,
39
    find_best_scoring,
40
)
41
from build_a_long.pdf_extract.classifier.text import (
1✔
42
    extract_step_number_value,
43
)
44
from build_a_long.pdf_extract.extractor.bbox import BBox, filter_overlapping
1✔
45
from build_a_long.pdf_extract.extractor.lego_page_elements import (
1✔
46
    Arrow,
47
    Diagram,
48
    Divider,
49
    LegoPageElements,
50
    PartsList,
51
    RotationSymbol,
52
    Step,
53
    StepNumber,
54
    SubAssembly,
55
)
56
from build_a_long.pdf_extract.extractor.page_blocks import Text
1✔
57

58
log = logging.getLogger(__name__)
1✔
59

60

61
class _StepScore(Score):
1✔
62
    """Internal score representation for step classification."""
63

64
    step_value: int
65
    """The parsed step number value (e.g., 1, 2, 3)."""
1✔
66

67
    step_number_candidate: Candidate
68
    """The step number candidate this step is associated with."""
1✔
69

70
    parts_list_candidate: Candidate | None
71
    """The parts list candidate paired with this step (if any)."""
1✔
72

73
    has_parts_list: bool
74
    """Whether this step has an associated parts list."""
1✔
75

76
    step_proximity_score: float
77
    """Score based on proximity to the PartsList above (0.0-1.0).
1✔
78
    1.0 for closest proximity, 0.0 if very far. 0.0 if no parts list."""
79

80
    step_alignment_score: float
81
    """Score based on left-edge alignment with PartsList above (0.0-1.0).
1✔
82
    1.0 is perfect alignment, 0.0 is very misaligned. 0.0 if no parts list."""
83

84
    def score(self) -> Weight:
1✔
85
        """Return the overall pairing score."""
UNCOV
86
        return self.overall_score()
×
87

88
    def overall_score(self) -> float:
1✔
89
        """Calculate overall quality score based on parts list pairing.
90

91
        Steps with a parts_list are given a bonus to prefer them over
92
        steps without parts_list. Diagrams are found at build time, not
93
        during scoring, to allow rotation symbols to claim small images first.
94
        """
95
        if self.has_parts_list:
1✔
96
            # Base score for having parts_list + proximity/alignment bonus
97
            parts_list_bonus = 0.5
1✔
98
            pairing_score = (
1✔
99
                self.step_proximity_score + self.step_alignment_score
100
            ) / 2.0
101
            return parts_list_bonus + 0.5 * pairing_score
1✔
102
        return 0.3  # Lower base score for steps without parts list
1✔
103

104
    def sort_key(self) -> tuple[float, int]:
1✔
105
        """Return a tuple for sorting candidates.
106

107
        We prefer:
108
        1. Higher overall scores (better StepNumber-PartsList-Diagram match)
109
        2. Lower step number values (to break ties and maintain order)
110
        """
111
        return (-self.overall_score(), self.step_value)
1✔
112

113

114
class StepClassifier(LabelClassifier):
1✔
115
    """Classifier for complete Step structures."""
116

117
    output = "step"
1✔
118
    requires = frozenset(
1✔
119
        {"step_number", "parts_list", "diagram", "rotation_symbol", "subassembly"}
120
    )
121

122
    def _score(self, result: ClassificationResult) -> None:
1✔
123
        """Score step pairings and create candidates."""
124
        page_data = result.page_data
1✔
125

126
        # Get step number and parts list candidates (not constructed elements)
127
        step_number_candidates = result.get_scored_candidates(
1✔
128
            "step_number", valid_only=False, exclude_failed=True
129
        )
130

131
        if not step_number_candidates:
1✔
132
            return
1✔
133

134
        # Get parts_list candidates
135
        parts_list_candidates = result.get_scored_candidates(
1✔
136
            "parts_list",
137
            valid_only=False,
138
            exclude_failed=True,
139
        )
140

141
        log.debug(
1✔
142
            "[step] page=%s step_candidates=%d parts_list_candidates=%d",
143
            page_data.page_number,
144
            len(step_number_candidates),
145
            len(parts_list_candidates),
146
        )
147

148
        # Create all possible Step candidates for pairings (without diagrams initially)
149
        # Deduplication happens at build time, not scoring time, so that
150
        # diagram assignment can be re-evaluated as blocks get claimed.
151
        all_candidates: list[Candidate] = []
1✔
152
        for step_candidate in step_number_candidates:
1✔
153
            # Create candidates for this StepNumber paired with each PartsList
154
            for parts_list_candidate in parts_list_candidates:
1✔
155
                candidate = self._create_step_candidate(
1✔
156
                    step_candidate, parts_list_candidate, result
157
                )
158
                if candidate:
1✔
159
                    all_candidates.append(candidate)
1✔
160

161
            # Also create a candidate with no PartsList (fallback)
162
            candidate = self._create_step_candidate(step_candidate, None, result)
1✔
163
            if candidate:
1✔
164
                all_candidates.append(candidate)
1✔
165

166
        # Add all candidates to result (deduplication happens at build time)
167
        for candidate in all_candidates:
1✔
168
            result.add_candidate(candidate)
1✔
169

170
        log.debug(
1✔
171
            "[step] Created %d step candidates",
172
            len(all_candidates),
173
        )
174

175
    def build_all(self, result: ClassificationResult) -> list[LegoPageElements]:
1✔
176
        """Build all Step elements with coordinated rotation symbol assignment.
177

178
        This method:
179
        1. Builds all rotation symbols first (so they claim their Drawing blocks)
180
        2. Builds all parts lists (so they claim their blocks)
181
        3. Builds Step candidates, deduplicating by step value at build time
182
        4. Uses Hungarian matching to optimally assign rotation symbols to steps
183

184
        Deduplication happens at build time (not scoring time) so that diagram
185
        assignment can be re-evaluated as blocks get claimed by other elements.
186

187
        This coordination ensures:
188
        - Rotation symbols are built before diagrams, preventing incorrect clustering
189
        - Each rotation symbol is assigned to at most one step
190
        - Assignment is globally optimal based on distance to step diagrams
191

192
        Returns:
193
            List of successfully constructed Step elements, sorted by step number
194
        """
195
        # Phase 1: Build all rotation symbols BEFORE steps.
196
        # This allows rotation symbols to claim their Drawing blocks first,
197
        # preventing them from being incorrectly clustered into diagrams.
198
        for rs_candidate in result.get_scored_candidates(
1✔
199
            "rotation_symbol", valid_only=False, exclude_failed=True
200
        ):
201
            try:
1✔
202
                result.build(rs_candidate)
1✔
203
                log.debug(
1✔
204
                    "[step] Built rotation symbol at %s (score=%.2f)",
205
                    rs_candidate.bbox,
206
                    rs_candidate.score,
207
                )
208
            except Exception as e:
×
UNCOV
209
                log.debug(
×
210
                    "[step] Failed to construct rotation_symbol candidate at %s: %s",
211
                    rs_candidate.bbox,
212
                    e,
213
                )
214

215
        # Phase 2: Build all parts lists BEFORE steps.
216
        # This allows parts lists to claim their Drawing blocks first,
217
        # preventing them from being claimed by subassemblies.
218
        for pl_candidate in result.get_scored_candidates(
1✔
219
            "parts_list", valid_only=False, exclude_failed=True
220
        ):
221
            try:
1✔
222
                result.build(pl_candidate)
1✔
223
                log.debug(
1✔
224
                    "[step] Built parts_list at %s (score=%.2f)",
225
                    pl_candidate.bbox,
226
                    pl_candidate.score,
227
                )
228
            except Exception as e:
×
UNCOV
229
                log.debug(
×
230
                    "[step] Failed to construct parts_list candidate at %s: %s",
231
                    pl_candidate.bbox,
232
                    e,
233
                )
234

235
        # Phase 3: Build all subassemblies BEFORE steps.
236
        # This allows us to exclude diagrams inside subassemblies from being
237
        # selected as the main step diagram.
238
        for sa_candidate in result.get_scored_candidates(
1✔
239
            "subassembly", valid_only=False, exclude_failed=True
240
        ):
241
            try:
1✔
242
                result.build(sa_candidate)
1✔
243
                log.debug(
1✔
244
                    "[step] Built subassembly at %s (score=%.2f)",
245
                    sa_candidate.bbox,
246
                    sa_candidate.score,
247
                )
UNCOV
248
            except Exception as e:
×
UNCOV
249
                log.debug(
×
250
                    "[step] Failed to construct subassembly candidate at %s: %s",
251
                    sa_candidate.bbox,
252
                    e,
253
                )
254

255
        # Phase 4: Build Step candidates with deduplication by step value
256
        # Filter out subassembly steps, then build in score order, skipping
257
        # step values that have already been built.
258
        # Steps are built as "partial" - just step_number + parts_list.
259
        # Diagrams and subassemblies are assigned later via Hungarian matching.
260
        all_step_candidates = result.get_scored_candidates(
1✔
261
            "step", valid_only=False, exclude_failed=True
262
        )
263
        page_level_step_candidates = self._filter_page_level_step_candidates(
1✔
264
            all_step_candidates
265
        )
266

267
        # Sort by score (highest first) so best candidates get built first
268
        sorted_candidates = sorted(
1✔
269
            page_level_step_candidates,
270
            key=lambda c: c.score,
271
            reverse=True,
272
        )
273

274
        steps: list[Step] = []
1✔
275
        built_step_values: set[int] = set()
1✔
276

277
        for step_candidate in sorted_candidates:
1✔
278
            # Get step value from score
279
            score = step_candidate.score_details
1✔
280
            if not isinstance(score, _StepScore):
1✔
UNCOV
281
                continue
×
282

283
            # Skip if we've already built a step with this value
284
            if score.step_value in built_step_values:
1✔
285
                log.debug(
1✔
286
                    "[step] Skipping duplicate step %d candidate (score=%.2f)",
287
                    score.step_value,
288
                    step_candidate.score,
289
                )
290
                continue
1✔
291

292
            try:
1✔
293
                elem = result.build(step_candidate)
1✔
294
                assert isinstance(elem, Step)
1✔
295
                steps.append(elem)
1✔
296
                built_step_values.add(score.step_value)
1✔
297
                log.debug(
1✔
298
                    "[step] Built partial step %d (parts_list=%s, score=%.2f)",
299
                    score.step_value,
300
                    "yes" if score.parts_list_candidate is not None else "no",
301
                    step_candidate.score,
302
                )
303
            except Exception as e:
1✔
304
                log.debug(
1✔
305
                    "[step] Failed to construct step %d candidate: %s",
306
                    score.step_value,
307
                    e,
308
                )
309

310
        # Sort steps by their step_number value
311
        steps.sort(key=lambda step: step.step_number.value)
1✔
312

313
        # Phase 5: Assign diagrams to steps using Hungarian matching
314
        # Collect available diagram candidates (not yet claimed by subassemblies)
315
        available_diagrams: list[Diagram] = []
1✔
316
        for diag_candidate in result.get_scored_candidates(
1✔
317
            "diagram", valid_only=False, exclude_failed=True
318
        ):
319
            if diag_candidate.constructed is None:
1✔
320
                # Build the diagram
321
                try:
1✔
322
                    diag_elem = result.build(diag_candidate)
1✔
323
                    assert isinstance(diag_elem, Diagram)
1✔
324
                    available_diagrams.append(diag_elem)
1✔
325
                except Exception as e:
1✔
326
                    log.debug(
1✔
327
                        "[step] Failed to build diagram at %s: %s",
328
                        diag_candidate.bbox,
329
                        e,
330
                    )
331
            elif isinstance(diag_candidate.constructed, Diagram):
1✔
332
                # Already built (claimed by subassembly) - skip
333
                pass
1✔
334

335
        log.debug(
1✔
336
            "[step] Available diagrams for step assignment: %d",
337
            len(available_diagrams),
338
        )
339

340
        # Assign diagrams to steps using Hungarian matching
341
        assign_diagrams_to_steps(steps, available_diagrams)
1✔
342

343
        # Phase 6: Assign subassemblies to steps using Hungarian matching
344
        # Collect built subassemblies
345
        subassemblies: list[SubAssembly] = []
1✔
346
        for sa_candidate in result.get_scored_candidates(
1✔
347
            "subassembly", valid_only=True
348
        ):
349
            assert sa_candidate.constructed is not None
1✔
350
            assert isinstance(sa_candidate.constructed, SubAssembly)
1✔
351
            subassemblies.append(sa_candidate.constructed)
1✔
352

353
        # Collect built dividers for obstruction checking
354
        dividers: list[Divider] = []
1✔
355
        for div_candidate in result.get_scored_candidates("divider", valid_only=True):
1✔
356
            assert div_candidate.constructed is not None
1✔
357
            assert isinstance(div_candidate.constructed, Divider)
1✔
358
            dividers.append(div_candidate.constructed)
1✔
359

360
        log.debug(
1✔
361
            "[step] Assigning %d subassemblies to %d steps (%d dividers for "
362
            "obstruction checking)",
363
            len(subassemblies),
364
            len(steps),
365
            len(dividers),
366
        )
367

368
        # Assign subassemblies to steps using Hungarian matching
369
        assign_subassemblies_to_steps(steps, subassemblies, dividers)
1✔
370

371
        # Phase 7: Finalize steps - compute arrows and final bboxes
372
        page_bbox = result.page_data.bbox
1✔
373
        for step in steps:
1✔
374
            # Get arrows for this step
375
            arrows = self._get_arrows_for_step(step.step_number, step.diagram, result)
1✔
376

377
            # Compute final bbox including all components
378
            final_bbox = self._compute_step_bbox(
1✔
379
                step.step_number,
380
                step.parts_list,
381
                step.diagram,
382
                step.subassemblies,
383
                page_bbox,
384
            )
385

386
            # Update the step (Step is a Pydantic model, so we need to reassign)
387
            # We mutate in place by directly setting attributes
388
            object.__setattr__(step, "arrows", arrows)
1✔
389
            object.__setattr__(step, "bbox", final_bbox)
1✔
390

391
        # Phase 8: Assign rotation symbols to steps using Hungarian matching
392
        # Collect built rotation symbols
393
        rotation_symbols: list[RotationSymbol] = []
1✔
394
        for rs_candidate in result.get_scored_candidates(
1✔
395
            "rotation_symbol", valid_only=True
396
        ):
397
            assert rs_candidate.constructed is not None
1✔
398
            assert isinstance(rs_candidate.constructed, RotationSymbol)
1✔
399
            rotation_symbols.append(rs_candidate.constructed)
1✔
400

401
        assign_rotation_symbols_to_steps(steps, rotation_symbols)
1✔
402

403
        log.debug(
1✔
404
            "[step] build_all complete: %d steps, %d rotation symbols assigned",
405
            len(steps),
406
            sum(1 for s in steps if s.rotation_symbol is not None),
407
        )
408

409
        # Cast for type compatibility with base class return type
410
        return list[LegoPageElements](steps)
1✔
411

412
    def _filter_page_level_step_candidates(
1✔
413
        self, candidates: list[Candidate]
414
    ) -> list[Candidate]:
415
        """Filter step candidates to exclude likely subassembly steps.
416

417
        Extracts step number values from candidates and uses the generic
418
        filter_subassembly_values function to filter out subassembly steps.
419

420
        Args:
421
            candidates: All step candidates to filter
422

423
        Returns:
424
            Filtered list of candidates likely to be page-level steps
425
        """
426
        # Extract step number values from candidates
427
        items_with_values: list[tuple[int, Candidate]] = []
1✔
428
        for candidate in candidates:
1✔
429
            score = candidate.score_details
1✔
430
            if not isinstance(score, _StepScore):
1✔
UNCOV
431
                continue
×
432
            items_with_values.append((score.step_value, candidate))
1✔
433

434
        # Use generic filtering function
435
        filtered_items = filter_subassembly_values(items_with_values)
1✔
436

437
        # If filtering occurred, return only the filtered candidates
438
        if len(filtered_items) != len(items_with_values):
1✔
439
            filtered_values = [v for v, _ in filtered_items]
1✔
440
            log.debug(
1✔
441
                "[step] Filtered out likely subassembly steps, keeping values: %s",
442
                sorted(filtered_values),
443
            )
444
            return [item for _, item in filtered_items]
1✔
445

446
        # No filtering occurred, return original candidates
447
        # (includes any candidates that couldn't have their value extracted)
448
        return candidates
1✔
449

450
    def build(self, candidate: Candidate, result: ClassificationResult) -> Step:
1✔
451
        """Construct a partial Step element from a single candidate.
452

453
        This creates a Step with just step_number and parts_list. The diagram,
454
        subassemblies, and arrows are assigned later in build_all() using
455
        Hungarian matching for optimal global assignment.
456
        """
457
        score = candidate.score_details
1✔
458
        assert isinstance(score, _StepScore)
1✔
459

460
        # Validate and extract step number from parent candidate
461
        step_num_candidate = score.step_number_candidate
1✔
462

463
        step_num_elem = result.build(step_num_candidate)
1✔
464
        assert isinstance(step_num_elem, StepNumber)
1✔
465
        step_num = step_num_elem
1✔
466

467
        # Validate and extract parts list from parent candidate (if present)
468
        parts_list = None
1✔
469
        if score.parts_list_candidate:
1✔
470
            parts_list_candidate = score.parts_list_candidate
1✔
471
            parts_list_elem = result.build(parts_list_candidate)
1✔
472
            assert isinstance(parts_list_elem, PartsList)
1✔
473
            parts_list = parts_list_elem
1✔
474

475
        # Create partial step - diagram, subassemblies, arrows assigned later
476
        initial_bbox = step_num.bbox
1✔
477
        if parts_list:
1✔
478
            initial_bbox = initial_bbox.union(parts_list.bbox)
1✔
479

480
        return Step(
1✔
481
            bbox=initial_bbox,
482
            step_number=step_num,
483
            parts_list=parts_list,
484
            diagram=None,
485
            rotation_symbol=None,
486
            arrows=[],
487
            subassemblies=[],
488
        )
489

490
    def _find_and_build_diagram_for_step(
1✔
491
        self,
492
        step_num: StepNumber,
493
        parts_list: PartsList | None,
494
        result: ClassificationResult,
495
    ) -> Diagram | None:
496
        """Find and build the best diagram for this step.
497

498
        This is called at build time, after rotation symbols and subassemblies
499
        have been built (in build_all Phases 1 and 3), so they've already
500
        claimed any images/diagrams they need. We filter to only consider
501
        diagram candidates that haven't been constructed yet.
502

503
        Args:
504
            step_num: The built step number element
505
            parts_list: The built parts list element (if any)
506
            result: Classification result containing diagram candidates
507

508
        Returns:
509
            The built Diagram element, or None if no suitable diagram found
510
        """
511
        # Get all non-failed, non-constructed diagram candidates
UNCOV
512
        diagram_candidates = result.get_scored_candidates(
×
513
            "diagram", valid_only=False, exclude_failed=True
514
        )
515

516
        # Filter to only candidates that haven't been built yet
517
        # (subassemblies and rotation symbols built earlier may have claimed some)
UNCOV
518
        available_candidates = [c for c in diagram_candidates if c.constructed is None]
×
519

UNCOV
520
        if not available_candidates:
×
UNCOV
521
            log.debug(
×
522
                "[step] No diagram candidates available for step %d",
523
                step_num.value,
524
            )
UNCOV
525
            return None
×
526

527
        # Score each candidate based on position relative to step
UNCOV
528
        step_bbox = step_num.bbox
×
UNCOV
529
        best_candidate = None
×
UNCOV
530
        best_score = -float("inf")
×
531

UNCOV
532
        for candidate in available_candidates:
×
UNCOV
533
            score = self._score_step_diagram_pair(step_bbox, candidate.bbox)
×
534

UNCOV
535
            log.debug(
×
536
                "[step] Diagram candidate at %s for step %d: score=%.2f",
537
                candidate.bbox,
538
                step_num.value,
539
                score,
540
            )
541

UNCOV
542
            if score > best_score:
×
UNCOV
543
                best_score = score
×
UNCOV
544
                best_candidate = candidate
×
545

UNCOV
546
        if best_candidate is None or best_score < 0.2:
×
UNCOV
547
            log.debug(
×
548
                "[step] No suitable diagram found for step %d (best_score=%.2f)",
549
                step_num.value,
550
                best_score,
551
            )
UNCOV
552
            return None
×
553

554
        # Build the diagram
UNCOV
555
        try:
×
UNCOV
556
            diagram_elem = result.build(best_candidate)
×
UNCOV
557
            assert isinstance(diagram_elem, Diagram)
×
UNCOV
558
            log.debug(
×
559
                "[step] Built diagram at %s for step %d (score=%.2f)",
560
                diagram_elem.bbox,
561
                step_num.value,
562
                best_score,
563
            )
UNCOV
564
            return diagram_elem
×
UNCOV
565
        except CandidateFailedError as e:
×
UNCOV
566
            log.debug(
×
567
                "[step] Failed to build diagram for step %d: %s",
568
                step_num.value,
569
                e,
570
            )
UNCOV
571
            return None
×
572

573
    def _create_step_candidate(
1✔
574
        self,
575
        step_candidate: Candidate,
576
        parts_list_candidate: Candidate | None,
577
        result: ClassificationResult,
578
    ) -> Candidate | None:
579
        """Create a Step candidate (without diagram assignment).
580

581
        Diagrams are found at build time, not during scoring, to allow
582
        rotation symbols to claim small images first.
583

584
        Args:
585
            step_candidate: The StepNumber candidate for this step
586
            parts_list_candidate: The PartsList candidate to pair with (or None)
587
            result: Classification result
588

589
        Returns:
590
            The created Candidate with score but no construction, or None if
591
            the step number value cannot be extracted.
592
        """
593
        ABOVE_EPS = 2.0  # Small epsilon for "above" check
1✔
594
        ALIGNMENT_THRESHOLD_MULTIPLIER = 1.0  # Max horizontal offset
1✔
595
        DISTANCE_THRESHOLD_MULTIPLIER = 1.0  # Max vertical distance
1✔
596

597
        # Extract step number value from the candidate
598
        if not step_candidate.source_blocks:
1✔
UNCOV
599
            return None
×
600
        source_block = step_candidate.source_blocks[0]
1✔
601
        if not isinstance(source_block, Text):
1✔
UNCOV
602
            return None
×
603
        step_value = extract_step_number_value(source_block.text)
1✔
604
        if step_value is None:
1✔
UNCOV
605
            return None
×
606

607
        step_bbox = step_candidate.bbox
1✔
608
        parts_list_bbox = parts_list_candidate.bbox if parts_list_candidate else None
1✔
609

610
        # Calculate pairing scores if there's a parts_list above the step
611
        proximity_score = 0.0
1✔
612
        alignment_score = 0.0
1✔
613

614
        if (
1✔
615
            parts_list_bbox is not None
616
            and parts_list_bbox.y1 <= step_bbox.y0 + ABOVE_EPS
617
        ):
618
            # Calculate distance (how far apart vertically)
619
            distance = step_bbox.y0 - parts_list_bbox.y1
1✔
620

621
            # Calculate proximity score
622
            max_distance = step_bbox.height * DISTANCE_THRESHOLD_MULTIPLIER
1✔
623
            if max_distance > 0:
1✔
624
                proximity_score = max(0.0, 1.0 - (distance / max_distance))
1✔
625

626
            # Calculate alignment score (how well left edges align)
627
            max_alignment_diff = step_bbox.width * ALIGNMENT_THRESHOLD_MULTIPLIER
1✔
628
            left_diff = abs(parts_list_bbox.x0 - step_bbox.x0)
1✔
629
            if max_alignment_diff > 0:
1✔
630
                alignment_score = max(0.0, 1.0 - (left_diff / max_alignment_diff))
1✔
631

632
        # Create score object with candidate references
633
        # Diagrams are found at build time, not during scoring
634
        score = _StepScore(
1✔
635
            step_value=step_value,
636
            step_number_candidate=step_candidate,
637
            parts_list_candidate=parts_list_candidate,
638
            has_parts_list=parts_list_candidate is not None,
639
            step_proximity_score=proximity_score,
640
            step_alignment_score=alignment_score,
641
        )
642

643
        # Calculate combined bbox for the candidate (without diagram)
644
        combined_bbox = step_bbox
1✔
645
        if parts_list_bbox:
1✔
646
            combined_bbox = BBox.union(combined_bbox, parts_list_bbox)
1✔
647

648
        # Create candidate
649
        return Candidate(
1✔
650
            bbox=combined_bbox,
651
            label="step",
652
            score=score.overall_score(),
653
            score_details=score,
654
            source_blocks=[],
655
        )
656

657
    def _score_step_diagram_pair(
1✔
658
        self,
659
        step_bbox: BBox,
660
        diagram_bbox: BBox,
661
    ) -> float:
662
        """Score how well a diagram matches a step.
663

664
        Diagrams are typically positioned to the right of and/or below the step
665
        number. This method scores based on:
666
        - Horizontal position: prefer diagrams to the right, penalize left
667
        - Vertical position: prefer diagrams below the step header
668
        - Distance: closer is better
669

670
        Args:
671
            step_bbox: The step number bounding box
672
            diagram_bbox: The diagram bounding box to score
673

674
        Returns:
675
            Score between 0.0 and 1.0 (higher is better match)
676
        """
677
        # Reference point: bottom-right of step number
UNCOV
678
        ref_x = step_bbox.x1
×
UNCOV
679
        ref_y = step_bbox.y1
×
680

681
        # TODO Move all these constants into config, or make them adaptive?
682

683
        # Horizontal score
684
        # Diagrams to the right are preferred, but allow some overlap
UNCOV
685
        x_offset = diagram_bbox.x0 - ref_x
×
686

UNCOV
687
        if x_offset >= -50:
×
688
            # Diagram starts to the right or slightly overlapping - good
689
            # Score decreases slightly with distance to the right
UNCOV
690
            x_score = max(0.5, 1.0 - abs(x_offset) / 400.0)
×
UNCOV
691
        elif x_offset >= -200:
×
692
            # Diagram is moderately to the left - acceptable
UNCOV
693
            x_score = 0.3 + 0.2 * (1.0 + x_offset / 200.0)
×
694
        else:
695
            # Diagram is far to the left - poor match
UNCOV
696
            x_score = max(0.1, 0.3 + x_offset / 400.0)
×
697

698
        # Vertical score
699
        # Diagrams below the step header are preferred
UNCOV
700
        y_offset = diagram_bbox.y0 - ref_y
×
701

UNCOV
702
        if y_offset >= -30:
×
703
            # Diagram starts below or slightly overlapping - good
704
            # Score decreases with vertical distance
UNCOV
705
            y_score = max(0.3, 1.0 - abs(y_offset) / 300.0)
×
UNCOV
706
        elif y_offset >= -100:
×
707
            # Diagram is moderately above - less good but acceptable
UNCOV
708
            y_score = 0.2 + 0.3 * (1.0 + y_offset / 100.0)
×
709
        else:
710
            # Diagram is far above - poor match
UNCOV
711
            y_score = max(0.05, 0.2 + y_offset / 300.0)
×
712

713
        # Combined score - weight both dimensions equally
UNCOV
714
        score = 0.5 * x_score + 0.5 * y_score
×
715

UNCOV
716
        return score
×
717

718
    def _get_rotation_symbol_for_step(
1✔
719
        self,
720
        step_bbox: BBox,
721
        result: ClassificationResult,
722
    ) -> RotationSymbol | None:
723
        """Find an already-built rotation symbol within this step's area.
724

725
        Rotation symbols are built by PageClassifier before steps are processed.
726
        This method finds the already-built rotation symbol that falls within
727
        the step's bounding box.
728

729
        Args:
730
            step_bbox: The step's bounding box (including step_num, parts_list,
731
                and diagram)
732
            result: Classification result containing rotation symbol candidates
733

734
        Returns:
735
            Single RotationSymbol element for this step, or None if not found
736
        """
737
        # Get rotation_symbol candidates - they should already be built
738
        # by PageClassifier. Use valid_only=True to only get successfully
739
        # constructed rotation symbols.
UNCOV
740
        rotation_symbol_candidates = result.get_scored_candidates(
×
741
            "rotation_symbol", valid_only=True
742
        )
743

UNCOV
744
        log.debug(
×
745
            "[step] Looking for rotation symbols in step bbox %s, "
746
            "found %d built candidates",
747
            step_bbox,
748
            len(rotation_symbol_candidates),
749
        )
750

UNCOV
751
        if not rotation_symbol_candidates:
×
UNCOV
752
            return None
×
753

754
        # Find best-scoring rotation symbol overlapping the step's bbox
UNCOV
755
        overlapping_candidates = filter_overlapping(
×
756
            rotation_symbol_candidates, step_bbox
757
        )
UNCOV
758
        best_candidate = find_best_scoring(overlapping_candidates)
×
759

UNCOV
760
        if best_candidate and best_candidate.constructed is not None:
×
UNCOV
761
            rotation_symbol = best_candidate.constructed
×
UNCOV
762
            assert isinstance(rotation_symbol, RotationSymbol)
×
UNCOV
763
            log.debug(
×
764
                "[step] Found rotation symbol at %s (score=%.2f)",
765
                rotation_symbol.bbox,
766
                best_candidate.score,
767
            )
UNCOV
768
            return rotation_symbol
×
769

UNCOV
770
        log.debug("[step] No rotation symbol found in step bbox %s", step_bbox)
×
UNCOV
771
        return None
×
772

773
    def _get_arrows_for_step(
1✔
774
        self,
775
        step_num: StepNumber,
776
        diagram: Diagram | None,
777
        result: ClassificationResult,
778
    ) -> list[Arrow]:
779
        """Find arrows associated with this step.
780

781
        Looks for arrow candidates that are positioned near the step's diagram
782
        or step number. Typically these are arrows pointing from subassembly
783
        callout boxes to the main diagram.
784

785
        Args:
786
            step_num: The step number element
787
            diagram: The diagram element (if any)
788
            result: Classification result containing arrow candidates
789

790
        Returns:
791
            List of Arrow elements for this step
792
        """
793
        arrow_candidates = result.get_scored_candidates(
1✔
794
            "arrow", valid_only=False, exclude_failed=True
795
        )
796

797
        log.debug(
1✔
798
            "[step] Looking for arrows for step %d, found %d candidates",
799
            step_num.value,
800
            len(arrow_candidates),
801
        )
802

803
        if not arrow_candidates:
1✔
804
            return []
1✔
805

806
        # Determine search region: prefer diagram area, fallback to step area
807
        search_bbox = diagram.bbox if diagram else step_num.bbox
1✔
808

809
        # Expand search region to catch arrows near the diagram
810
        # Use a larger margin than rotation symbols since arrows can extend further
811
        search_region = search_bbox.expand(100.0)
1✔
812

813
        log.debug(
1✔
814
            "[step] Arrow search region for step %d: %s",
815
            step_num.value,
816
            search_region,
817
        )
818

819
        # Find arrows within or overlapping the search region
820
        arrows: list[Arrow] = []
1✔
821
        overlapping_candidates = filter_overlapping(arrow_candidates, search_region)
1✔
822

823
        for candidate in overlapping_candidates:
1✔
824
            log.debug(
1✔
825
                "[step]   Arrow candidate at %s, overlaps=True, score=%.2f",
826
                candidate.bbox,
827
                candidate.score,
828
            )
829
            try:
1✔
830
                arrow = result.build(candidate)
1✔
831
                assert isinstance(arrow, Arrow)
1✔
832
                arrows.append(arrow)
1✔
833
            except CandidateFailedError:
1✔
834
                # Arrow lost conflict to another arrow (they share source blocks)
835
                # This is expected when multiple arrows overlap - skip it
836
                log.debug(
1✔
837
                    "[step]   Arrow candidate at %s failed (conflict), skipping",
838
                    candidate.bbox,
839
                )
840
                continue
1✔
841

842
        log.debug(
1✔
843
            "[step] Found %d arrows for step %d",
844
            len(arrows),
845
            step_num.value,
846
        )
847
        return arrows
1✔
848

849
    def _get_subassemblies_for_step(
1✔
850
        self,
851
        step_num: StepNumber,
852
        diagram: Diagram | None,
853
        result: ClassificationResult,
854
    ) -> list[SubAssembly]:
855
        """Get already-built subassemblies that belong to this step.
856

857
        Subassemblies are built in build_all() before steps. This method finds
858
        subassemblies that are positioned near this step's diagram and haven't
859
        been claimed by another step yet.
860

861
        Args:
862
            step_num: The step number element
863
            diagram: The diagram element (if any)
864
            result: Classification result containing subassembly candidates
865

866
        Returns:
867
            List of SubAssembly elements for this step
868
        """
869
        # Get all built subassemblies
UNCOV
870
        all_subassemblies: list[SubAssembly] = []
×
UNCOV
871
        for sa_candidate in result.get_scored_candidates(
×
872
            "subassembly", valid_only=True
873
        ):
UNCOV
874
            assert sa_candidate.constructed is not None
×
UNCOV
875
            assert isinstance(sa_candidate.constructed, SubAssembly)
×
UNCOV
876
            all_subassemblies.append(sa_candidate.constructed)
×
877

UNCOV
878
        log.debug(
×
879
            "[step] Looking for subassemblies for step %d, found %d built",
880
            step_num.value,
881
            len(all_subassemblies),
882
        )
883

UNCOV
884
        if not all_subassemblies:
×
UNCOV
885
            return []
×
886

887
        # Determine search region based on step_number and diagram
UNCOV
888
        reference_bbox = diagram.bbox.union(step_num.bbox) if diagram else step_num.bbox
×
889

UNCOV
890
        page_bbox = result.page_data.bbox
×
891

892
        # Expand search region around the reference area
893
        # Horizontally: search the full page width
894
        # Vertically: search a region around the reference bbox
UNCOV
895
        vertical_margin = reference_bbox.height
×
UNCOV
896
        search_region = BBox(
×
897
            x0=page_bbox.x0,
898
            y0=max(page_bbox.y0, reference_bbox.y0 - vertical_margin),
899
            x1=page_bbox.x1,
900
            y1=min(page_bbox.y1, reference_bbox.y1 + vertical_margin),
901
        )
902

UNCOV
903
        log.debug(
×
904
            "[step] SubAssembly search region for step %d: %s",
905
            step_num.value,
906
            search_region,
907
        )
908

909
        # Find subassemblies that overlap the search region
UNCOV
910
        subassemblies: list[SubAssembly] = []
×
UNCOV
911
        for subassembly in all_subassemblies:
×
UNCOV
912
            if subassembly.bbox.overlaps(search_region):
×
UNCOV
913
                log.debug(
×
914
                    "[step]   SubAssembly at %s overlaps search region",
915
                    subassembly.bbox,
916
                )
UNCOV
917
                subassemblies.append(subassembly)
×
918

UNCOV
919
        log.debug(
×
920
            "[step] Found %d subassemblies for step %d",
921
            len(subassemblies),
922
            step_num.value,
923
        )
UNCOV
924
        return subassemblies
×
925

926
    def _compute_step_bbox(
1✔
927
        self,
928
        step_num: StepNumber,
929
        parts_list: PartsList | None,
930
        diagram: Diagram | None,
931
        subassemblies: list[SubAssembly],
932
        page_bbox: BBox,
933
    ) -> BBox:
934
        """Compute the overall bounding box for the Step.
935

936
        This encompasses the step number, parts list (if any), diagram (if any),
937
        and all subassemblies.
938
        The result is clipped to the page bounds to handle elements that extend
939
        slightly off-page (e.g., arrows in diagrams).
940

941
        Args:
942
            step_num: The step number element
943
            parts_list: The parts list (if any)
944
            diagram: The diagram element (if any)
945
            subassemblies: List of subassemblies for this step
946
            page_bbox: The page bounding box to clip to
947

948
        Returns:
949
            Combined bounding box, clipped to page bounds
950
        """
951
        bboxes = [step_num.bbox]
1✔
952
        if parts_list:
1✔
953
            bboxes.append(parts_list.bbox)
1✔
954
        if diagram:
1✔
955
            bboxes.append(diagram.bbox)
1✔
956
        for sa in subassemblies:
1✔
957
            bboxes.append(sa.bbox)
1✔
958

959
        return BBox.union_all(bboxes).clip_to(page_bbox)
1✔
960

961

962
def assign_diagrams_to_steps(
1✔
963
    steps: list[Step],
964
    diagrams: list[Diagram],
965
    max_distance: float = 500.0,
966
) -> None:
967
    """Assign diagrams to steps using Hungarian algorithm.
968

969
    Uses optimal bipartite matching to pair diagrams with steps based on
970
    a scoring function that considers:
971
    - Distance from step_number to diagram (closer is better)
972
    - Relative position (diagram typically below/right of step_number)
973

974
    This function mutates the Step objects in place, setting their diagram
975
    attribute.
976

977
    Args:
978
        steps: List of Step objects to assign diagrams to
979
        diagrams: List of Diagram objects to assign
980
        max_distance: Maximum distance for a valid assignment. Pairs with distance
981
            greater than this will not be matched.
982
    """
983
    if not steps or not diagrams:
1✔
984
        log.debug(
1✔
985
            "[step] No diagram assignment needed: steps=%d, diagrams=%d",
986
            len(steps),
987
            len(diagrams),
988
        )
989
        return
1✔
990

991
    n_steps = len(steps)
1✔
992
    n_diagrams = len(diagrams)
1✔
993

994
    log.debug(
1✔
995
        "[step] Running Hungarian matching for diagrams: %d steps, %d diagrams",
996
        n_steps,
997
        n_diagrams,
998
    )
999

1000
    # Build cost matrix: rows = diagrams, cols = steps
1001
    # Lower cost = better match
1002
    cost_matrix = np.zeros((n_diagrams, n_steps))
1✔
1003

1004
    for i, diag in enumerate(diagrams):
1✔
1005
        diag_center = diag.bbox.center
1✔
1006
        for j, step in enumerate(steps):
1✔
1007
            step_num_center = step.step_number.bbox.center
1✔
1008

1009
            # Base cost is distance from step_number to diagram center
1010
            distance = (
1✔
1011
                (step_num_center[0] - diag_center[0]) ** 2
1012
                + (step_num_center[1] - diag_center[1]) ** 2
1013
            ) ** 0.5
1014

1015
            # Apply position penalty: prefer diagrams that are below or to the
1016
            # right of the step_number (typical LEGO instruction layout)
1017
            # If diagram is above the step_number, add penalty
1018
            if diag_center[1] < step_num_center[1] - 50:  # Diagram above step
1✔
1019
                distance *= 1.5  # Penalty for being above
1✔
1020

1021
            cost_matrix[i, j] = distance
1✔
1022
            log.debug(
1✔
1023
                "[step]   Cost diagram[%d] at %s to step[%d]: %.1f",
1024
                i,
1025
                diag.bbox,
1026
                step.step_number.value,
1027
                distance,
1028
            )
1029

1030
    # Apply max_distance threshold
1031
    high_cost = max_distance * 10
1✔
1032
    cost_matrix_thresholded = np.where(
1✔
1033
        cost_matrix > max_distance, high_cost, cost_matrix
1034
    )
1035

1036
    # Run Hungarian algorithm
1037
    row_indices, col_indices = linear_sum_assignment(cost_matrix_thresholded)
1✔
1038

1039
    # Assign diagrams to steps based on the matching
1040
    for row_idx, col_idx in zip(row_indices, col_indices, strict=True):
1✔
1041
        if cost_matrix[row_idx, col_idx] <= max_distance:
1✔
1042
            diag = diagrams[row_idx]
1✔
1043
            step = steps[col_idx]
1✔
1044
            # Use object.__setattr__ because Step is a frozen Pydantic model
1045
            object.__setattr__(step, "diagram", diag)
1✔
1046
            log.debug(
1✔
1047
                "[step] Assigned diagram at %s to step %d (cost=%.1f)",
1048
                diag.bbox,
1049
                step.step_number.value,
1050
                cost_matrix[row_idx, col_idx],
1051
            )
1052
        else:
UNCOV
1053
            log.debug(
×
1054
                "[step] Skipped diagram assignment: diagram[%d] to step[%d] "
1055
                "cost=%.1f > max_distance=%.1f",
1056
                row_idx,
1057
                col_idx,
1058
                cost_matrix[row_idx, col_idx],
1059
                max_distance,
1060
            )
1061

1062

1063
def _has_divider_between(
1✔
1064
    bbox1: BBox,
1065
    bbox2: BBox,
1066
    dividers: list[Divider],
1067
) -> bool:
1068
    """Check if there is a divider line between two bboxes.
1069

1070
    A divider is considered "between" if it crosses the line connecting
1071
    the centers of the two bboxes.
1072

1073
    Args:
1074
        bbox1: First bounding box
1075
        bbox2: Second bounding box
1076
        dividers: List of dividers to check
1077

1078
    Returns:
1079
        True if a divider is between the two bboxes
1080
    """
1081
    center1 = bbox1.center
1✔
1082
    center2 = bbox2.center
1✔
1083

1084
    for divider in dividers:
1✔
1085
        div_bbox = divider.bbox
1✔
1086

1087
        # Check if divider is vertical (separates left/right)
1088
        if div_bbox.width < div_bbox.height * 0.2:  # Vertical line
1✔
1089
            # Check if divider x is between the two centers
1090
            min_x = min(center1[0], center2[0])
1✔
1091
            max_x = max(center1[0], center2[0])
1✔
1092
            div_x = div_bbox.center[0]
1✔
1093
            if min_x < div_x < max_x:
1✔
1094
                # Check if divider spans the y range
1095
                min_y = min(center1[1], center2[1])
1✔
1096
                max_y = max(center1[1], center2[1])
1✔
1097
                if div_bbox.y0 <= max_y and div_bbox.y1 >= min_y:
1✔
1098
                    return True
1✔
1099

1100
        # Check if divider is horizontal (separates top/bottom)
UNCOV
1101
        elif div_bbox.height < div_bbox.width * 0.2:  # Horizontal line
×
1102
            # Check if divider y is between the two centers
UNCOV
1103
            min_y = min(center1[1], center2[1])
×
UNCOV
1104
            max_y = max(center1[1], center2[1])
×
UNCOV
1105
            div_y = div_bbox.center[1]
×
UNCOV
1106
            if min_y < div_y < max_y:
×
1107
                # Check if divider spans the x range
UNCOV
1108
                min_x = min(center1[0], center2[0])
×
UNCOV
1109
                max_x = max(center1[0], center2[0])
×
UNCOV
1110
                if div_bbox.x0 <= max_x and div_bbox.x1 >= min_x:
×
UNCOV
1111
                    return True
×
1112

1113
    return False
1✔
1114

1115

1116
def assign_subassemblies_to_steps(
1✔
1117
    steps: list[Step],
1118
    subassemblies: list[SubAssembly],
1119
    dividers: list[Divider],
1120
    max_distance: float = 400.0,
1121
) -> None:
1122
    """Assign subassemblies to steps using Hungarian algorithm.
1123

1124
    Uses optimal bipartite matching to pair subassemblies with steps based on:
1125
    - Distance from step's diagram to subassembly (closer is better)
1126
    - No divider between the subassembly and the step's diagram
1127

1128
    This function mutates the Step objects in place, adding to their
1129
    subassemblies list.
1130

1131
    Args:
1132
        steps: List of Step objects to assign subassemblies to
1133
        subassemblies: List of SubAssembly objects to assign
1134
        dividers: List of Divider objects for obstruction checking
1135
        max_distance: Maximum distance for a valid assignment
1136
    """
1137
    if not steps or not subassemblies:
1✔
1138
        log.debug(
1✔
1139
            "[step] No subassembly assignment needed: steps=%d, subassemblies=%d",
1140
            len(steps),
1141
            len(subassemblies),
1142
        )
1143
        return
1✔
1144

1145
    n_steps = len(steps)
1✔
1146
    n_subassemblies = len(subassemblies)
1✔
1147

1148
    log.debug(
1✔
1149
        "[step] Running Hungarian matching for subassemblies: "
1150
        "%d steps, %d subassemblies",
1151
        n_steps,
1152
        n_subassemblies,
1153
    )
1154

1155
    # Build cost matrix: rows = subassemblies, cols = steps
1156
    cost_matrix = np.zeros((n_subassemblies, n_steps))
1✔
1157
    high_cost = max_distance * 10
1✔
1158

1159
    for i, sa in enumerate(subassemblies):
1✔
1160
        sa_center = sa.bbox.center
1✔
1161
        for j, step in enumerate(steps):
1✔
1162
            # Use diagram center if available, otherwise step bbox center
1163
            if step.diagram:
1✔
1164
                target_bbox = step.diagram.bbox
1✔
1165
                target_center = target_bbox.center
1✔
1166
            else:
UNCOV
1167
                target_bbox = step.bbox
×
UNCOV
1168
                target_center = step.step_number.bbox.center
×
1169

1170
            # Base cost is distance from diagram/step to subassembly center
1171
            distance = (
1✔
1172
                (target_center[0] - sa_center[0]) ** 2
1173
                + (target_center[1] - sa_center[1]) ** 2
1174
            ) ** 0.5
1175

1176
            # Check for divider between subassembly and step's diagram
1177
            if _has_divider_between(sa.bbox, target_bbox, dividers):
1✔
1178
                # High cost if there's a divider between
1179
                distance = high_cost
1✔
1180
                log.debug(
1✔
1181
                    "[step]   Divider between subassembly[%d] at %s and step[%d]",
1182
                    i,
1183
                    sa.bbox,
1184
                    step.step_number.value,
1185
                )
1186

1187
            cost_matrix[i, j] = distance
1✔
1188
            log.debug(
1✔
1189
                "[step]   Cost subassembly[%d] at %s to step[%d]: %.1f",
1190
                i,
1191
                sa.bbox,
1192
                step.step_number.value,
1193
                distance,
1194
            )
1195

1196
    # Apply max_distance threshold
1197
    cost_matrix_thresholded = np.where(
1✔
1198
        cost_matrix > max_distance, high_cost, cost_matrix
1199
    )
1200

1201
    # Run Hungarian algorithm
1202
    row_indices, col_indices = linear_sum_assignment(cost_matrix_thresholded)
1✔
1203

1204
    # Assign subassemblies to steps based on the matching
1205
    # Note: Unlike diagrams, multiple subassemblies can be assigned to one step
1206
    # The Hungarian algorithm gives us the best 1:1 matching, but we may want
1207
    # to assign additional nearby subassemblies too
1208

1209
    # First, do the 1:1 optimal assignment
1210
    assigned_subassemblies: set[int] = set()
1✔
1211
    for row_idx, col_idx in zip(row_indices, col_indices, strict=True):
1✔
1212
        if cost_matrix[row_idx, col_idx] <= max_distance:
1✔
1213
            sa = subassemblies[row_idx]
1✔
1214
            step = steps[col_idx]
1✔
1215
            # Add to step's subassemblies list
1216
            new_subassemblies = list(step.subassemblies) + [sa]
1✔
1217
            object.__setattr__(step, "subassemblies", new_subassemblies)
1✔
1218
            assigned_subassemblies.add(row_idx)
1✔
1219
            log.debug(
1✔
1220
                "[step] Assigned subassembly at %s to step %d (cost=%.1f)",
1221
                sa.bbox,
1222
                step.step_number.value,
1223
                cost_matrix[row_idx, col_idx],
1224
            )
1225

1226
    # Then, try to assign remaining unassigned subassemblies to their nearest step
1227
    # (if within max_distance and no divider between)
1228
    for i, sa in enumerate(subassemblies):
1✔
1229
        if i in assigned_subassemblies:
1✔
1230
            continue
1✔
1231

1232
        # Find the step with lowest cost for this subassembly
1233
        best_step_idx = None
1✔
1234
        best_cost = high_cost
1✔
1235
        for j in range(len(steps)):
1✔
1236
            if cost_matrix[i, j] < best_cost:
1✔
1237
                best_cost = cost_matrix[i, j]
1✔
1238
                best_step_idx = j
1✔
1239

1240
        if best_step_idx is not None and best_cost <= max_distance:
1✔
1241
            step = steps[best_step_idx]
1✔
1242
            new_subassemblies = list(step.subassemblies) + [sa]
1✔
1243
            object.__setattr__(step, "subassemblies", new_subassemblies)
1✔
1244
            log.debug(
1✔
1245
                "[step] Assigned extra subassembly at %s to step %d (cost=%.1f)",
1246
                sa.bbox,
1247
                step.step_number.value,
1248
                best_cost,
1249
            )
1250

1251

1252
def assign_rotation_symbols_to_steps(
1✔
1253
    steps: list[Step],
1254
    rotation_symbols: list[RotationSymbol],
1255
    max_distance: float = 300.0,
1256
) -> None:
1257
    """Assign rotation symbols to steps using Hungarian algorithm.
1258

1259
    Uses optimal bipartite matching to pair rotation symbols with steps based on
1260
    minimum distance from the rotation symbol to the step's diagram (or step bbox
1261
    center if no diagram).
1262

1263
    This function mutates the Step objects in place, setting their rotation_symbol
1264
    attribute.
1265

1266
    Args:
1267
        steps: List of Step objects to assign rotation symbols to
1268
        rotation_symbols: List of RotationSymbol objects to assign
1269
        max_distance: Maximum distance for a valid assignment. Pairs with distance
1270
            greater than this will not be matched.
1271
    """
1272
    if not steps or not rotation_symbols:
1✔
1273
        log.debug(
1✔
1274
            "[step] No rotation symbol assignment needed: "
1275
            "steps=%d, rotation_symbols=%d",
1276
            len(steps),
1277
            len(rotation_symbols),
1278
        )
1279
        return
1✔
1280

1281
    n_steps = len(steps)
1✔
1282
    n_symbols = len(rotation_symbols)
1✔
1283

1284
    log.debug(
1✔
1285
        "[step] Running Hungarian matching: %d steps, %d rotation symbols",
1286
        n_steps,
1287
        n_symbols,
1288
    )
1289

1290
    # Build cost matrix: rows = rotation symbols, cols = steps
1291
    # Cost = distance from rotation symbol center to step's diagram center
1292
    # (or step bbox center if no diagram)
1293
    cost_matrix = np.zeros((n_symbols, n_steps))
1✔
1294

1295
    for i, rs in enumerate(rotation_symbols):
1✔
1296
        rs_center = rs.bbox.center
1✔
1297
        for j, step in enumerate(steps):
1✔
1298
            # Use diagram center if available, otherwise step bbox center
1299
            if step.diagram:
1✔
1300
                target_center = step.diagram.bbox.center
1✔
1301
            else:
UNCOV
1302
                target_center = step.bbox.center
×
1303

1304
            # Euclidean distance
1305
            distance = (
1✔
1306
                (rs_center[0] - target_center[0]) ** 2
1307
                + (rs_center[1] - target_center[1]) ** 2
1308
            ) ** 0.5
1309

1310
            cost_matrix[i, j] = distance
1✔
1311
            log.debug(
1✔
1312
                "[step]   Distance from rotation_symbol[%d] at %s to step[%d]: %.1f",
1313
                i,
1314
                rs.bbox,
1315
                step.step_number.value,
1316
                distance,
1317
            )
1318

1319
    # Apply max_distance threshold by setting costs above threshold to a high value
1320
    # This prevents assignments that are too far apart
1321
    high_cost = max_distance * 10  # Arbitrary large value
1✔
1322
    cost_matrix_thresholded = np.where(
1✔
1323
        cost_matrix > max_distance, high_cost, cost_matrix
1324
    )
1325

1326
    # Run Hungarian algorithm
1327
    row_indices, col_indices = linear_sum_assignment(cost_matrix_thresholded)
1✔
1328

1329
    # Assign rotation symbols to steps based on the matching
1330
    for row_idx, col_idx in zip(row_indices, col_indices, strict=True):
1✔
1331
        # Check if this assignment is within the max_distance threshold
1332
        if cost_matrix[row_idx, col_idx] <= max_distance:
1✔
1333
            rs = rotation_symbols[row_idx]
1✔
1334
            step = steps[col_idx]
1✔
1335
            step.rotation_symbol = rs
1✔
1336
            # Expand step bbox to include the rotation symbol
1337
            step.bbox = step.bbox.union(rs.bbox)
1✔
1338
            log.debug(
1✔
1339
                "[step] Assigned rotation symbol at %s to step %d "
1340
                "(distance=%.1f, new step bbox=%s)",
1341
                rs.bbox,
1342
                step.step_number.value,
1343
                cost_matrix[row_idx, col_idx],
1344
                step.bbox,
1345
            )
1346
        else:
UNCOV
1347
            log.debug(
×
1348
                "[step] Skipped assignment: rotation_symbol[%d] to step[%d] "
1349
                "distance=%.1f > max_distance=%.1f",
1350
                row_idx,
1351
                col_indices[row_idx] if row_idx < len(col_indices) else -1,
1352
                cost_matrix[row_idx, col_idx],
1353
                max_distance,
1354
            )
1355

1356

1357
def filter_subassembly_values[T](
1✔
1358
    items: list[tuple[int, T]],
1359
    *,
1360
    min_gap: int = 3,
1361
    max_subassembly_start: int = 3,
1362
) -> list[tuple[int, T]]:
1363
    """Filter items to exclude likely subassembly values.
1364

1365
    Subassembly steps (e.g., step 1, 2 inside a subassembly box) often appear
1366
    alongside higher-numbered page-level steps (e.g., 15, 16). This creates
1367
    sequences like [1, 2, 15, 16] which cannot all be page-level steps.
1368

1369
    This function identifies such cases by detecting a significant gap in values
1370
    and filtering out the lower-numbered items that are likely subassembly steps.
1371

1372
    Args:
1373
        items: List of (value, item) tuples where value is the step number
1374
            and item is any associated data (e.g., a Candidate).
1375
        min_gap: Minimum gap size to trigger filtering (exclusive).
1376
            Default is 3, so gaps > 3 trigger filtering.
1377
        max_subassembly_start: Maximum starting value for subassembly detection.
1378
            If min value is <= this, filtering is applied. Default is 3.
1379

1380
    Returns:
1381
        Filtered list of (value, item) tuples, keeping only the higher values
1382
        if a subassembly pattern is detected. Returns original list if no
1383
        filtering is needed.
1384

1385
    Examples:
1386
        >>> filter_subassembly_values([(1, "a"), (2, "b"), (15, "c"), (16, "d")])
1387
        [(15, "c"), (16, "d")]
1388

1389
        >>> filter_subassembly_values([(5, "a"), (6, "b"), (15, "c")])
1390
        [(5, "a"), (6, "b"), (15, "c")]  # min=5 > 3, no filtering
1391
    """
1392
    if len(items) <= 1:
1✔
1393
        return items
1✔
1394

1395
    # Sort by value
1396
    sorted_items = sorted(items, key=lambda x: x[0])
1✔
1397
    values = [v for v, _ in sorted_items]
1✔
1398

1399
    # Find the largest gap between consecutive values
1400
    max_gap_size = 0
1✔
1401
    max_gap_index = -1
1✔
1402
    for i in range(len(values) - 1):
1✔
1403
        gap = values[i + 1] - values[i]
1✔
1404
        if gap > max_gap_size:
1✔
1405
            max_gap_size = gap
1✔
1406
            max_gap_index = i
1✔
1407

1408
    # Check if filtering should occur:
1409
    # 1. Gap must be larger than min_gap
1410
    # 2. The minimum value must be <= max_subassembly_start
1411
    if max_gap_size > min_gap and max_gap_index >= 0:
1✔
1412
        min_value = values[0]
1✔
1413
        if min_value <= max_subassembly_start:
1✔
1414
            threshold = values[max_gap_index + 1]
1✔
1415
            return [(v, item) for v, item in sorted_items if v >= threshold]
1✔
1416

1417
    return items
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc