• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bramp / build-along / 20010582393

07 Dec 2025 09:22PM UTC coverage: 90.299% (-0.02%) from 90.316%
20010582393

push

github

bramp
refactor(classifiers): use BBox helpers filter_contained and filter_overlapping

5 of 8 new or added lines in 2 files covered. (62.5%)

144 existing lines in 10 files now uncovered.

10779 of 11937 relevant lines covered (90.3%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.2
/src/build_a_long/pdf_extract/classifier/steps/step_classifier.py
1
"""
2
Step classifier.
3

4
Purpose
5
-------
6
Identify complete Step structures by combining step_number, parts_list, and diagram
7
elements. A Step represents a single building instruction comprising:
8
- A StepNumber label
9
- An optional PartsList (the parts needed for this step)
10
- A Diagram (the main instruction graphic showing what to build)
11

12
We look for step_numbers and attempt to pair them with nearby parts_lists and
13
identify the appropriate diagram region for each step.
14

15
Debugging
16
---------
17
Set environment variables to aid investigation without code changes:
18

19
- LOG_LEVEL=DEBUG
20
    Enables DEBUG-level logging (if not already configured by caller).
21
"""
22

23
import logging
1✔
24

25
import numpy as np
1✔
26
from scipy.optimize import linear_sum_assignment
1✔
27

28
from build_a_long.pdf_extract.classifier.candidate import Candidate
1✔
29
from build_a_long.pdf_extract.classifier.classification_result import (
1✔
30
    CandidateFailedError,
31
    ClassificationResult,
32
)
33
from build_a_long.pdf_extract.classifier.label_classifier import (
1✔
34
    LabelClassifier,
35
)
36
from build_a_long.pdf_extract.classifier.score import (
1✔
37
    Score,
38
    Weight,
39
    find_best_scoring,
40
)
41
from build_a_long.pdf_extract.classifier.text import (
1✔
42
    extract_step_number_value,
43
)
44
from build_a_long.pdf_extract.extractor.bbox import BBox, filter_overlapping
1✔
45
from build_a_long.pdf_extract.extractor.lego_page_elements import (
1✔
46
    Arrow,
47
    Diagram,
48
    LegoPageElements,
49
    PartsList,
50
    RotationSymbol,
51
    Step,
52
    StepNumber,
53
    SubAssembly,
54
)
55
from build_a_long.pdf_extract.extractor.page_blocks import Text
1✔
56

57
log = logging.getLogger(__name__)
1✔
58

59

60
class _StepScore(Score):
1✔
61
    """Internal score representation for step classification."""
62

63
    step_value: int
64
    """The parsed step number value (e.g., 1, 2, 3)."""
1✔
65

66
    step_number_candidate: Candidate
67
    """The step number candidate this step is associated with."""
1✔
68

69
    parts_list_candidate: Candidate | None
70
    """The parts list candidate paired with this step (if any)."""
1✔
71

72
    has_parts_list: bool
73
    """Whether this step has an associated parts list."""
1✔
74

75
    step_proximity_score: float
76
    """Score based on proximity to the PartsList above (0.0-1.0).
1✔
77
    1.0 for closest proximity, 0.0 if very far. 0.0 if no parts list."""
78

79
    step_alignment_score: float
80
    """Score based on left-edge alignment with PartsList above (0.0-1.0).
1✔
81
    1.0 is perfect alignment, 0.0 is very misaligned. 0.0 if no parts list."""
82

83
    def score(self) -> Weight:
1✔
84
        """Return the overall pairing score."""
UNCOV
85
        return self.overall_score()
×
86

87
    def overall_score(self) -> float:
1✔
88
        """Calculate overall quality score based on parts list pairing.
89

90
        Steps with a parts_list are given a bonus to prefer them over
91
        steps without parts_list. Diagrams are found at build time, not
92
        during scoring, to allow rotation symbols to claim small images first.
93
        """
94
        if self.has_parts_list:
1✔
95
            # Base score for having parts_list + proximity/alignment bonus
96
            parts_list_bonus = 0.5
1✔
97
            pairing_score = (
1✔
98
                self.step_proximity_score + self.step_alignment_score
99
            ) / 2.0
100
            return parts_list_bonus + 0.5 * pairing_score
1✔
101
        return 0.3  # Lower base score for steps without parts list
1✔
102

103
    def sort_key(self) -> tuple[float, int]:
1✔
104
        """Return a tuple for sorting candidates.
105

106
        We prefer:
107
        1. Higher overall scores (better StepNumber-PartsList-Diagram match)
108
        2. Lower step number values (to break ties and maintain order)
109
        """
110
        return (-self.overall_score(), self.step_value)
1✔
111

112

113
class StepClassifier(LabelClassifier):
1✔
114
    """Classifier for complete Step structures."""
115

116
    output = "step"
1✔
117
    requires = frozenset(
1✔
118
        {"step_number", "parts_list", "diagram", "rotation_symbol", "subassembly"}
119
    )
120

121
    def _score(self, result: ClassificationResult) -> None:
1✔
122
        """Score step pairings and create candidates."""
123
        page_data = result.page_data
1✔
124

125
        # Get step number and parts list candidates (not constructed elements)
126
        step_number_candidates = result.get_scored_candidates(
1✔
127
            "step_number", valid_only=False, exclude_failed=True
128
        )
129

130
        if not step_number_candidates:
1✔
131
            return
1✔
132

133
        # Get parts_list candidates
134
        parts_list_candidates = result.get_scored_candidates(
1✔
135
            "parts_list",
136
            valid_only=False,
137
            exclude_failed=True,
138
        )
139

140
        log.debug(
1✔
141
            "[step] page=%s step_candidates=%d parts_list_candidates=%d",
142
            page_data.page_number,
143
            len(step_number_candidates),
144
            len(parts_list_candidates),
145
        )
146

147
        # Create all possible Step candidates for pairings (without diagrams initially)
148
        # Deduplication happens at build time, not scoring time, so that
149
        # diagram assignment can be re-evaluated as blocks get claimed.
150
        all_candidates: list[Candidate] = []
1✔
151
        for step_candidate in step_number_candidates:
1✔
152
            # Create candidates for this StepNumber paired with each PartsList
153
            for parts_list_candidate in parts_list_candidates:
1✔
154
                candidate = self._create_step_candidate(
1✔
155
                    step_candidate, parts_list_candidate, result
156
                )
157
                if candidate:
1✔
158
                    all_candidates.append(candidate)
1✔
159

160
            # Also create a candidate with no PartsList (fallback)
161
            candidate = self._create_step_candidate(step_candidate, None, result)
1✔
162
            if candidate:
1✔
163
                all_candidates.append(candidate)
1✔
164

165
        # Add all candidates to result (deduplication happens at build time)
166
        for candidate in all_candidates:
1✔
167
            result.add_candidate(candidate)
1✔
168

169
        log.debug(
1✔
170
            "[step] Created %d step candidates",
171
            len(all_candidates),
172
        )
173

174
    def build_all(self, result: ClassificationResult) -> list[LegoPageElements]:
1✔
175
        """Build all Step elements with coordinated rotation symbol assignment.
176

177
        This method:
178
        1. Builds all rotation symbols first (so they claim their Drawing blocks)
179
        2. Builds all parts lists (so they claim their blocks)
180
        3. Builds Step candidates, deduplicating by step value at build time
181
        4. Uses Hungarian matching to optimally assign rotation symbols to steps
182

183
        Deduplication happens at build time (not scoring time) so that diagram
184
        assignment can be re-evaluated as blocks get claimed by other elements.
185

186
        This coordination ensures:
187
        - Rotation symbols are built before diagrams, preventing incorrect clustering
188
        - Each rotation symbol is assigned to at most one step
189
        - Assignment is globally optimal based on distance to step diagrams
190

191
        Returns:
192
            List of successfully constructed Step elements, sorted by step number
193
        """
194
        # Phase 1: Build all rotation symbols BEFORE steps.
195
        # This allows rotation symbols to claim their Drawing blocks first,
196
        # preventing them from being incorrectly clustered into diagrams.
197
        for rs_candidate in result.get_scored_candidates(
1✔
198
            "rotation_symbol", valid_only=False, exclude_failed=True
199
        ):
200
            try:
1✔
201
                result.build(rs_candidate)
1✔
202
                log.debug(
1✔
203
                    "[step] Built rotation symbol at %s (score=%.2f)",
204
                    rs_candidate.bbox,
205
                    rs_candidate.score,
206
                )
UNCOV
207
            except Exception as e:
×
UNCOV
208
                log.debug(
×
209
                    "[step] Failed to construct rotation_symbol candidate at %s: %s",
210
                    rs_candidate.bbox,
211
                    e,
212
                )
213

214
        # Phase 2: Build all parts lists BEFORE steps.
215
        # This allows parts lists to claim their Drawing blocks first,
216
        # preventing them from being claimed by subassemblies.
217
        for pl_candidate in result.get_scored_candidates(
1✔
218
            "parts_list", valid_only=False, exclude_failed=True
219
        ):
220
            try:
1✔
221
                result.build(pl_candidate)
1✔
222
                log.debug(
1✔
223
                    "[step] Built parts_list at %s (score=%.2f)",
224
                    pl_candidate.bbox,
225
                    pl_candidate.score,
226
                )
UNCOV
227
            except Exception as e:
×
UNCOV
228
                log.debug(
×
229
                    "[step] Failed to construct parts_list candidate at %s: %s",
230
                    pl_candidate.bbox,
231
                    e,
232
                )
233

234
        # Phase 3: Build Step candidates with deduplication by step value
235
        # Filter out subassembly steps, then build in score order, skipping
236
        # step values that have already been built.
237
        all_step_candidates = result.get_scored_candidates(
1✔
238
            "step", valid_only=False, exclude_failed=True
239
        )
240
        page_level_step_candidates = self._filter_page_level_step_candidates(
1✔
241
            all_step_candidates
242
        )
243

244
        # Sort by score (highest first) so best candidates get built first
245
        sorted_candidates = sorted(
1✔
246
            page_level_step_candidates,
247
            key=lambda c: c.score,
248
            reverse=True,
249
        )
250

251
        steps: list[Step] = []
1✔
252
        built_step_values: set[int] = set()
1✔
253

254
        for step_candidate in sorted_candidates:
1✔
255
            # Get step value from score
256
            score = step_candidate.score_details
1✔
257
            if not isinstance(score, _StepScore):
1✔
UNCOV
258
                continue
×
259

260
            # Skip if we've already built a step with this value
261
            if score.step_value in built_step_values:
1✔
262
                log.debug(
1✔
263
                    "[step] Skipping duplicate step %d candidate (score=%.2f)",
264
                    score.step_value,
265
                    step_candidate.score,
266
                )
267
                continue
1✔
268

269
            try:
1✔
270
                elem = result.build(step_candidate)
1✔
271
                assert isinstance(elem, Step)
1✔
272
                steps.append(elem)
1✔
273
                built_step_values.add(score.step_value)
1✔
274
                log.debug(
1✔
275
                    "[step] Built step %d (parts_list=%s, score=%.2f)",
276
                    score.step_value,
277
                    "yes" if score.parts_list_candidate is not None else "no",
278
                    step_candidate.score,
279
                )
280
            except Exception as e:
1✔
281
                log.debug(
1✔
282
                    "[step] Failed to construct step %d candidate: %s",
283
                    score.step_value,
284
                    e,
285
                )
286

287
        # Sort steps by their step_number value
288
        steps.sort(key=lambda step: step.step_number.value)
1✔
289

290
        # Phase 3: Assign rotation symbols to steps using Hungarian matching
291
        # Collect built rotation symbols
292
        rotation_symbols: list[RotationSymbol] = []
1✔
293
        for rs_candidate in result.get_scored_candidates(
1✔
294
            "rotation_symbol", valid_only=True
295
        ):
296
            assert rs_candidate.constructed is not None
1✔
297
            assert isinstance(rs_candidate.constructed, RotationSymbol)
1✔
298
            rotation_symbols.append(rs_candidate.constructed)
1✔
299

300
        assign_rotation_symbols_to_steps(steps, rotation_symbols)
1✔
301

302
        log.debug(
1✔
303
            "[step] build_all complete: %d steps, %d rotation symbols assigned",
304
            len(steps),
305
            sum(1 for s in steps if s.rotation_symbol is not None),
306
        )
307

308
        # Cast for type compatibility with base class return type
309
        return list[LegoPageElements](steps)
1✔
310

311
    def _filter_page_level_step_candidates(
1✔
312
        self, candidates: list[Candidate]
313
    ) -> list[Candidate]:
314
        """Filter step candidates to exclude likely subassembly steps.
315

316
        Extracts step number values from candidates and uses the generic
317
        filter_subassembly_values function to filter out subassembly steps.
318

319
        Args:
320
            candidates: All step candidates to filter
321

322
        Returns:
323
            Filtered list of candidates likely to be page-level steps
324
        """
325
        # Extract step number values from candidates
326
        items_with_values: list[tuple[int, Candidate]] = []
1✔
327
        for candidate in candidates:
1✔
328
            score = candidate.score_details
1✔
329
            if not isinstance(score, _StepScore):
1✔
UNCOV
330
                continue
×
331
            items_with_values.append((score.step_value, candidate))
1✔
332

333
        # Use generic filtering function
334
        filtered_items = filter_subassembly_values(items_with_values)
1✔
335

336
        # If filtering occurred, return only the filtered candidates
337
        if len(filtered_items) != len(items_with_values):
1✔
338
            filtered_values = [v for v, _ in filtered_items]
1✔
339
            log.debug(
1✔
340
                "[step] Filtered out likely subassembly steps, keeping values: %s",
341
                sorted(filtered_values),
342
            )
343
            return [item for _, item in filtered_items]
1✔
344

345
        # No filtering occurred, return original candidates
346
        # (includes any candidates that couldn't have their value extracted)
347
        return candidates
1✔
348

349
    def build(self, candidate: Candidate, result: ClassificationResult) -> Step:
1✔
350
        """Construct a Step element from a single candidate."""
351
        score = candidate.score_details
1✔
352
        assert isinstance(score, _StepScore)
1✔
353

354
        # Validate and extract step number from parent candidate
355
        step_num_candidate = score.step_number_candidate
1✔
356

357
        step_num_elem = result.build(step_num_candidate)
1✔
358
        assert isinstance(step_num_elem, StepNumber)
1✔
359
        step_num = step_num_elem
1✔
360

361
        # Validate and extract parts list from parent candidate (if present)
362
        parts_list = None
1✔
363
        if score.parts_list_candidate:
1✔
364
            parts_list_candidate = score.parts_list_candidate
1✔
365
            parts_list_elem = result.build(parts_list_candidate)
1✔
366
            assert isinstance(parts_list_elem, PartsList)
1✔
367
            parts_list = parts_list_elem
1✔
368

369
        # Find and build the diagram for this step first
370
        diagram = self._find_and_build_diagram_for_step(step_num, parts_list, result)
1✔
371

372
        # Compute the step's bbox (step_num + parts_list + diagram)
373
        page_bbox = result.page_data.bbox
1✔
374
        step_bbox = self._compute_step_bbox(step_num, parts_list, diagram, page_bbox)
1✔
375

376
        # Get arrows for this step (from subassemblies and other sources)
377
        arrows = self._get_arrows_for_step(step_num, diagram, result)
1✔
378

379
        # Get subassemblies for this step
380
        subassemblies = self._get_subassemblies_for_step(step_num, diagram, result)
1✔
381

382
        # Build Step (rotation_symbol will be assigned later)
383
        return Step(
1✔
384
            bbox=step_bbox,
385
            step_number=step_num,
386
            parts_list=parts_list,
387
            diagram=diagram,
388
            rotation_symbol=None,
389
            arrows=arrows,
390
            subassemblies=subassemblies,
391
        )
392

393
    def _find_and_build_diagram_for_step(
1✔
394
        self,
395
        step_num: StepNumber,
396
        parts_list: PartsList | None,
397
        result: ClassificationResult,
398
    ) -> Diagram | None:
399
        """Find and build the best diagram for this step.
400

401
        This is called at build time, after rotation symbols have been built,
402
        so they've already claimed any small images they need. This ensures
403
        the diagram doesn't incorrectly cluster rotation symbol images.
404

405
        Args:
406
            step_num: The built step number element
407
            parts_list: The built parts list element (if any)
408
            result: Classification result containing diagram candidates
409

410
        Returns:
411
            The built Diagram element, or None if no suitable diagram found
412
        """
413
        # Get all non-failed, non-constructed diagram candidates
414
        diagram_candidates = result.get_scored_candidates(
1✔
415
            "diagram", valid_only=False, exclude_failed=True
416
        )
417

418
        # Filter to only candidates that haven't been built yet
419
        available_candidates = [c for c in diagram_candidates if c.constructed is None]
1✔
420

421
        if not available_candidates:
1✔
422
            log.debug(
1✔
423
                "[step] No diagram candidates available for step %d",
424
                step_num.value,
425
            )
426
            return None
1✔
427

428
        # Score each candidate based on position relative to step
429
        step_bbox = step_num.bbox
1✔
430
        best_candidate = None
1✔
431
        best_score = -float("inf")
1✔
432

433
        for candidate in available_candidates:
1✔
434
            score = self._score_step_diagram_pair(step_bbox, candidate.bbox)
1✔
435

436
            log.debug(
1✔
437
                "[step] Diagram candidate at %s for step %d: score=%.2f",
438
                candidate.bbox,
439
                step_num.value,
440
                score,
441
            )
442

443
            if score > best_score:
1✔
444
                best_score = score
1✔
445
                best_candidate = candidate
1✔
446

447
        if best_candidate is None or best_score < 0.2:
1✔
UNCOV
448
            log.debug(
×
449
                "[step] No suitable diagram found for step %d (best_score=%.2f)",
450
                step_num.value,
451
                best_score,
452
            )
UNCOV
453
            return None
×
454

455
        # Build the diagram
456
        try:
1✔
457
            diagram_elem = result.build(best_candidate)
1✔
458
            assert isinstance(diagram_elem, Diagram)
1✔
459
            log.debug(
1✔
460
                "[step] Built diagram at %s for step %d (score=%.2f)",
461
                diagram_elem.bbox,
462
                step_num.value,
463
                best_score,
464
            )
465
            return diagram_elem
1✔
UNCOV
466
        except CandidateFailedError as e:
×
UNCOV
467
            log.debug(
×
468
                "[step] Failed to build diagram for step %d: %s",
469
                step_num.value,
470
                e,
471
            )
UNCOV
472
            return None
×
473

474
    def _create_step_candidate(
1✔
475
        self,
476
        step_candidate: Candidate,
477
        parts_list_candidate: Candidate | None,
478
        result: ClassificationResult,
479
    ) -> Candidate | None:
480
        """Create a Step candidate (without diagram assignment).
481

482
        Diagrams are found at build time, not during scoring, to allow
483
        rotation symbols to claim small images first.
484

485
        Args:
486
            step_candidate: The StepNumber candidate for this step
487
            parts_list_candidate: The PartsList candidate to pair with (or None)
488
            result: Classification result
489

490
        Returns:
491
            The created Candidate with score but no construction, or None if
492
            the step number value cannot be extracted.
493
        """
494
        ABOVE_EPS = 2.0  # Small epsilon for "above" check
1✔
495
        ALIGNMENT_THRESHOLD_MULTIPLIER = 1.0  # Max horizontal offset
1✔
496
        DISTANCE_THRESHOLD_MULTIPLIER = 1.0  # Max vertical distance
1✔
497

498
        # Extract step number value from the candidate
499
        if not step_candidate.source_blocks:
1✔
UNCOV
500
            return None
×
501
        source_block = step_candidate.source_blocks[0]
1✔
502
        if not isinstance(source_block, Text):
1✔
UNCOV
503
            return None
×
504
        step_value = extract_step_number_value(source_block.text)
1✔
505
        if step_value is None:
1✔
UNCOV
506
            return None
×
507

508
        step_bbox = step_candidate.bbox
1✔
509
        parts_list_bbox = parts_list_candidate.bbox if parts_list_candidate else None
1✔
510

511
        # Calculate pairing scores if there's a parts_list above the step
512
        proximity_score = 0.0
1✔
513
        alignment_score = 0.0
1✔
514

515
        if (
1✔
516
            parts_list_bbox is not None
517
            and parts_list_bbox.y1 <= step_bbox.y0 + ABOVE_EPS
518
        ):
519
            # Calculate distance (how far apart vertically)
520
            distance = step_bbox.y0 - parts_list_bbox.y1
1✔
521

522
            # Calculate proximity score
523
            max_distance = step_bbox.height * DISTANCE_THRESHOLD_MULTIPLIER
1✔
524
            if max_distance > 0:
1✔
525
                proximity_score = max(0.0, 1.0 - (distance / max_distance))
1✔
526

527
            # Calculate alignment score (how well left edges align)
528
            max_alignment_diff = step_bbox.width * ALIGNMENT_THRESHOLD_MULTIPLIER
1✔
529
            left_diff = abs(parts_list_bbox.x0 - step_bbox.x0)
1✔
530
            if max_alignment_diff > 0:
1✔
531
                alignment_score = max(0.0, 1.0 - (left_diff / max_alignment_diff))
1✔
532

533
        # Create score object with candidate references
534
        # Diagrams are found at build time, not during scoring
535
        score = _StepScore(
1✔
536
            step_value=step_value,
537
            step_number_candidate=step_candidate,
538
            parts_list_candidate=parts_list_candidate,
539
            has_parts_list=parts_list_candidate is not None,
540
            step_proximity_score=proximity_score,
541
            step_alignment_score=alignment_score,
542
        )
543

544
        # Calculate combined bbox for the candidate (without diagram)
545
        combined_bbox = step_bbox
1✔
546
        if parts_list_bbox:
1✔
547
            combined_bbox = BBox.union(combined_bbox, parts_list_bbox)
1✔
548

549
        # Create candidate
550
        return Candidate(
1✔
551
            bbox=combined_bbox,
552
            label="step",
553
            score=score.overall_score(),
554
            score_details=score,
555
            source_blocks=[],
556
        )
557

558
    def _score_step_diagram_pair(
1✔
559
        self,
560
        step_bbox: BBox,
561
        diagram_bbox: BBox,
562
    ) -> float:
563
        """Score how well a diagram matches a step.
564

565
        Diagrams are typically positioned to the right of and/or below the step
566
        number. This method scores based on:
567
        - Horizontal position: prefer diagrams to the right, penalize left
568
        - Vertical position: prefer diagrams below the step header
569
        - Distance: closer is better
570

571
        Args:
572
            step_bbox: The step number bounding box
573
            diagram_bbox: The diagram bounding box to score
574

575
        Returns:
576
            Score between 0.0 and 1.0 (higher is better match)
577
        """
578
        # Reference point: bottom-right of step number
579
        ref_x = step_bbox.x1
1✔
580
        ref_y = step_bbox.y1
1✔
581

582
        # TODO Move all these constants into config, or make them adaptive?
583

584
        # Horizontal score
585
        # Diagrams to the right are preferred, but allow some overlap
586
        x_offset = diagram_bbox.x0 - ref_x
1✔
587

588
        if x_offset >= -50:
1✔
589
            # Diagram starts to the right or slightly overlapping - good
590
            # Score decreases slightly with distance to the right
591
            x_score = max(0.5, 1.0 - abs(x_offset) / 400.0)
1✔
592
        elif x_offset >= -200:
1✔
593
            # Diagram is moderately to the left - acceptable
594
            x_score = 0.3 + 0.2 * (1.0 + x_offset / 200.0)
1✔
595
        else:
596
            # Diagram is far to the left - poor match
597
            x_score = max(0.1, 0.3 + x_offset / 400.0)
1✔
598

599
        # Vertical score
600
        # Diagrams below the step header are preferred
601
        y_offset = diagram_bbox.y0 - ref_y
1✔
602

603
        if y_offset >= -30:
1✔
604
            # Diagram starts below or slightly overlapping - good
605
            # Score decreases with vertical distance
606
            y_score = max(0.3, 1.0 - abs(y_offset) / 300.0)
1✔
607
        elif y_offset >= -100:
1✔
608
            # Diagram is moderately above - less good but acceptable
609
            y_score = 0.2 + 0.3 * (1.0 + y_offset / 100.0)
1✔
610
        else:
611
            # Diagram is far above - poor match
612
            y_score = max(0.05, 0.2 + y_offset / 300.0)
1✔
613

614
        # Combined score - weight both dimensions equally
615
        score = 0.5 * x_score + 0.5 * y_score
1✔
616

617
        return score
1✔
618

619
    def _get_rotation_symbol_for_step(
1✔
620
        self,
621
        step_bbox: BBox,
622
        result: ClassificationResult,
623
    ) -> RotationSymbol | None:
624
        """Find an already-built rotation symbol within this step's area.
625

626
        Rotation symbols are built by PageClassifier before steps are processed.
627
        This method finds the already-built rotation symbol that falls within
628
        the step's bounding box.
629

630
        Args:
631
            step_bbox: The step's bounding box (including step_num, parts_list,
632
                and diagram)
633
            result: Classification result containing rotation symbol candidates
634

635
        Returns:
636
            Single RotationSymbol element for this step, or None if not found
637
        """
638
        # Get rotation_symbol candidates - they should already be built
639
        # by PageClassifier. Use valid_only=True to only get successfully
640
        # constructed rotation symbols.
UNCOV
641
        rotation_symbol_candidates = result.get_scored_candidates(
×
642
            "rotation_symbol", valid_only=True
643
        )
644

645
        log.debug(
×
646
            "[step] Looking for rotation symbols in step bbox %s, "
647
            "found %d built candidates",
648
            step_bbox,
649
            len(rotation_symbol_candidates),
650
        )
651

652
        if not rotation_symbol_candidates:
×
UNCOV
653
            return None
×
654

655
        # Find best-scoring rotation symbol overlapping the step's bbox
656
        overlapping_candidates = filter_overlapping(
×
657
            rotation_symbol_candidates, step_bbox
658
        )
UNCOV
659
        best_candidate = find_best_scoring(overlapping_candidates)
×
660

UNCOV
661
        if best_candidate and best_candidate.constructed is not None:
×
662
            rotation_symbol = best_candidate.constructed
×
UNCOV
663
            assert isinstance(rotation_symbol, RotationSymbol)
×
664
            log.debug(
×
665
                "[step] Found rotation symbol at %s (score=%.2f)",
666
                rotation_symbol.bbox,
667
                best_candidate.score,
668
            )
UNCOV
669
            return rotation_symbol
×
670

UNCOV
671
        log.debug("[step] No rotation symbol found in step bbox %s", step_bbox)
×
UNCOV
672
        return None
×
673

674
    def _get_arrows_for_step(
1✔
675
        self,
676
        step_num: StepNumber,
677
        diagram: Diagram | None,
678
        result: ClassificationResult,
679
    ) -> list[Arrow]:
680
        """Find arrows associated with this step.
681

682
        Looks for arrow candidates that are positioned near the step's diagram
683
        or step number. Typically these are arrows pointing from subassembly
684
        callout boxes to the main diagram.
685

686
        Args:
687
            step_num: The step number element
688
            diagram: The diagram element (if any)
689
            result: Classification result containing arrow candidates
690

691
        Returns:
692
            List of Arrow elements for this step
693
        """
694
        arrow_candidates = result.get_scored_candidates(
1✔
695
            "arrow", valid_only=False, exclude_failed=True
696
        )
697

698
        log.debug(
1✔
699
            "[step] Looking for arrows for step %d, found %d candidates",
700
            step_num.value,
701
            len(arrow_candidates),
702
        )
703

704
        if not arrow_candidates:
1✔
705
            return []
1✔
706

707
        # Determine search region: prefer diagram area, fallback to step area
708
        search_bbox = diagram.bbox if diagram else step_num.bbox
1✔
709

710
        # Expand search region to catch arrows near the diagram
711
        # Use a larger margin than rotation symbols since arrows can extend further
712
        search_region = search_bbox.expand(100.0)
1✔
713

714
        log.debug(
1✔
715
            "[step] Arrow search region for step %d: %s",
716
            step_num.value,
717
            search_region,
718
        )
719

720
        # Find arrows within or overlapping the search region
721
        arrows: list[Arrow] = []
1✔
722
        overlapping_candidates = filter_overlapping(arrow_candidates, search_region)
1✔
723

724
        for candidate in overlapping_candidates:
1✔
725
            log.debug(
1✔
726
                "[step]   Arrow candidate at %s, overlaps=True, score=%.2f",
727
                candidate.bbox,
728
                candidate.score,
729
            )
730
            try:
1✔
731
                arrow = result.build(candidate)
1✔
732
                assert isinstance(arrow, Arrow)
1✔
733
                arrows.append(arrow)
1✔
734
            except CandidateFailedError:
1✔
735
                # Arrow lost conflict to another arrow (they share source blocks)
736
                # This is expected when multiple arrows overlap - skip it
737
                log.debug(
1✔
738
                    "[step]   Arrow candidate at %s failed (conflict), skipping",
739
                    candidate.bbox,
740
                )
741
                continue
1✔
742

743
        log.debug(
1✔
744
            "[step] Found %d arrows for step %d",
745
            len(arrows),
746
            step_num.value,
747
        )
748
        return arrows
1✔
749

750
    def _get_subassemblies_for_step(
1✔
751
        self,
752
        step_num: StepNumber,
753
        diagram: Diagram | None,
754
        result: ClassificationResult,
755
    ) -> list[SubAssembly]:
756
        """Find subassemblies associated with this step.
757

758
        Looks for subassembly candidates that are positioned near the step's
759
        diagram or step number. SubAssemblies are callout boxes showing
760
        sub-assemblies.
761

762
        Args:
763
            step_num: The step number element
764
            diagram: The diagram element (if any)
765
            result: Classification result containing subassembly candidates
766

767
        Returns:
768
            List of SubAssembly elements for this step
769
        """
770
        subassembly_candidates = result.get_scored_candidates(
1✔
771
            "subassembly", valid_only=False, exclude_failed=True
772
        )
773

774
        log.debug(
1✔
775
            "[step] Looking for subassemblies for step %d, found %d candidates",
776
            step_num.value,
777
            len(subassembly_candidates),
778
        )
779

780
        if not subassembly_candidates:
1✔
781
            return []
1✔
782

783
        # Determine search region: prefer diagram area, fallback to step area
784
        search_bbox = diagram.bbox if diagram else step_num.bbox
1✔
785

786
        # Expand search region to catch subassemblies near the diagram
787
        # Use a larger margin since subassemblies can be positioned further from
788
        # the main diagram
789
        search_region = search_bbox.expand(150.0)
1✔
790

791
        log.debug(
1✔
792
            "[step] SubAssembly search region for step %d: %s",
793
            step_num.value,
794
            search_region,
795
        )
796

797
        # Find subassemblies within or overlapping the search region
798
        subassemblies: list[SubAssembly] = []
1✔
799
        overlapping_candidates = filter_overlapping(
1✔
800
            subassembly_candidates, search_region
801
        )
802

803
        for candidate in overlapping_candidates:
1✔
804
            log.debug(
1✔
805
                "[step]   SubAssembly candidate at %s, overlaps=True, score=%.2f",
806
                candidate.bbox,
807
                candidate.score,
808
            )
809
            try:
1✔
810
                subassembly = result.build(candidate)
1✔
811
                assert isinstance(subassembly, SubAssembly)
1✔
812
                subassemblies.append(subassembly)
1✔
813
            except Exception as e:
1✔
814
                log.debug(
1✔
815
                    "[step]   Failed to build subassembly at %s: %s",
816
                    candidate.bbox,
817
                    e,
818
                )
819

820
        log.debug(
1✔
821
            "[step] Found %d subassemblies for step %d",
822
            len(subassemblies),
823
            step_num.value,
824
        )
825
        return subassemblies
1✔
826

827
    def _compute_step_bbox(
1✔
828
        self,
829
        step_num: StepNumber,
830
        parts_list: PartsList | None,
831
        diagram: Diagram | None,
832
        page_bbox: BBox,
833
    ) -> BBox:
834
        """Compute the overall bounding box for the Step.
835

836
        This encompasses the step number, parts list (if any), and diagram (if any).
837
        The result is clipped to the page bounds to handle elements that extend
838
        slightly off-page (e.g., arrows in diagrams).
839

840
        Args:
841
            step_num: The step number element
842
            parts_list: The parts list (if any)
843
            diagram: The diagram element (if any)
844
            page_bbox: The page bounding box to clip to
845

846
        Returns:
847
            Combined bounding box, clipped to page bounds
848
        """
849
        bboxes = [step_num.bbox]
1✔
850
        if parts_list:
1✔
851
            bboxes.append(parts_list.bbox)
1✔
852
        if diagram:
1✔
853
            bboxes.append(diagram.bbox)
1✔
854

855
        return BBox.union_all(bboxes).clip_to(page_bbox)
1✔
856

857

858
def assign_rotation_symbols_to_steps(
1✔
859
    steps: list[Step],
860
    rotation_symbols: list[RotationSymbol],
861
    max_distance: float = 300.0,
862
) -> None:
863
    """Assign rotation symbols to steps using Hungarian algorithm.
864

865
    Uses optimal bipartite matching to pair rotation symbols with steps based on
866
    minimum distance from the rotation symbol to the step's diagram (or step bbox
867
    center if no diagram).
868

869
    This function mutates the Step objects in place, setting their rotation_symbol
870
    attribute.
871

872
    Args:
873
        steps: List of Step objects to assign rotation symbols to
874
        rotation_symbols: List of RotationSymbol objects to assign
875
        max_distance: Maximum distance for a valid assignment. Pairs with distance
876
            greater than this will not be matched.
877
    """
878
    if not steps or not rotation_symbols:
1✔
879
        log.debug(
1✔
880
            "[step] No rotation symbol assignment needed: "
881
            "steps=%d, rotation_symbols=%d",
882
            len(steps),
883
            len(rotation_symbols),
884
        )
885
        return
1✔
886

887
    n_steps = len(steps)
1✔
888
    n_symbols = len(rotation_symbols)
1✔
889

890
    log.debug(
1✔
891
        "[step] Running Hungarian matching: %d steps, %d rotation symbols",
892
        n_steps,
893
        n_symbols,
894
    )
895

896
    # Build cost matrix: rows = rotation symbols, cols = steps
897
    # Cost = distance from rotation symbol center to step's diagram center
898
    # (or step bbox center if no diagram)
899
    cost_matrix = np.zeros((n_symbols, n_steps))
1✔
900

901
    for i, rs in enumerate(rotation_symbols):
1✔
902
        rs_center = rs.bbox.center
1✔
903
        for j, step in enumerate(steps):
1✔
904
            # Use diagram center if available, otherwise step bbox center
905
            if step.diagram:
1✔
906
                target_center = step.diagram.bbox.center
1✔
907
            else:
UNCOV
908
                target_center = step.bbox.center
×
909

910
            # Euclidean distance
911
            distance = (
1✔
912
                (rs_center[0] - target_center[0]) ** 2
913
                + (rs_center[1] - target_center[1]) ** 2
914
            ) ** 0.5
915

916
            cost_matrix[i, j] = distance
1✔
917
            log.debug(
1✔
918
                "[step]   Distance from rotation_symbol[%d] at %s to step[%d]: %.1f",
919
                i,
920
                rs.bbox,
921
                step.step_number.value,
922
                distance,
923
            )
924

925
    # Apply max_distance threshold by setting costs above threshold to a high value
926
    # This prevents assignments that are too far apart
927
    high_cost = max_distance * 10  # Arbitrary large value
1✔
928
    cost_matrix_thresholded = np.where(
1✔
929
        cost_matrix > max_distance, high_cost, cost_matrix
930
    )
931

932
    # Run Hungarian algorithm
933
    row_indices, col_indices = linear_sum_assignment(cost_matrix_thresholded)
1✔
934

935
    # Assign rotation symbols to steps based on the matching
936
    for row_idx, col_idx in zip(row_indices, col_indices, strict=True):
1✔
937
        # Check if this assignment is within the max_distance threshold
938
        if cost_matrix[row_idx, col_idx] <= max_distance:
1✔
939
            rs = rotation_symbols[row_idx]
1✔
940
            step = steps[col_idx]
1✔
941
            step.rotation_symbol = rs
1✔
942
            # Expand step bbox to include the rotation symbol
943
            step.bbox = step.bbox.union(rs.bbox)
1✔
944
            log.debug(
1✔
945
                "[step] Assigned rotation symbol at %s to step %d "
946
                "(distance=%.1f, new step bbox=%s)",
947
                rs.bbox,
948
                step.step_number.value,
949
                cost_matrix[row_idx, col_idx],
950
                step.bbox,
951
            )
952
        else:
UNCOV
953
            log.debug(
×
954
                "[step] Skipped assignment: rotation_symbol[%d] to step[%d] "
955
                "distance=%.1f > max_distance=%.1f",
956
                row_idx,
957
                col_indices[row_idx] if row_idx < len(col_indices) else -1,
958
                cost_matrix[row_idx, col_idx],
959
                max_distance,
960
            )
961

962

963
def filter_subassembly_values[T](
1✔
964
    items: list[tuple[int, T]],
965
    *,
966
    min_gap: int = 3,
967
    max_subassembly_start: int = 3,
968
) -> list[tuple[int, T]]:
969
    """Filter items to exclude likely subassembly values.
970

971
    Subassembly steps (e.g., step 1, 2 inside a subassembly box) often appear
972
    alongside higher-numbered page-level steps (e.g., 15, 16). This creates
973
    sequences like [1, 2, 15, 16] which cannot all be page-level steps.
974

975
    This function identifies such cases by detecting a significant gap in values
976
    and filtering out the lower-numbered items that are likely subassembly steps.
977

978
    Args:
979
        items: List of (value, item) tuples where value is the step number
980
            and item is any associated data (e.g., a Candidate).
981
        min_gap: Minimum gap size to trigger filtering (exclusive).
982
            Default is 3, so gaps > 3 trigger filtering.
983
        max_subassembly_start: Maximum starting value for subassembly detection.
984
            If min value is <= this, filtering is applied. Default is 3.
985

986
    Returns:
987
        Filtered list of (value, item) tuples, keeping only the higher values
988
        if a subassembly pattern is detected. Returns original list if no
989
        filtering is needed.
990

991
    Examples:
992
        >>> filter_subassembly_values([(1, "a"), (2, "b"), (15, "c"), (16, "d")])
993
        [(15, "c"), (16, "d")]
994

995
        >>> filter_subassembly_values([(5, "a"), (6, "b"), (15, "c")])
996
        [(5, "a"), (6, "b"), (15, "c")]  # min=5 > 3, no filtering
997
    """
998
    if len(items) <= 1:
1✔
999
        return items
1✔
1000

1001
    # Sort by value
1002
    sorted_items = sorted(items, key=lambda x: x[0])
1✔
1003
    values = [v for v, _ in sorted_items]
1✔
1004

1005
    # Find the largest gap between consecutive values
1006
    max_gap_size = 0
1✔
1007
    max_gap_index = -1
1✔
1008
    for i in range(len(values) - 1):
1✔
1009
        gap = values[i + 1] - values[i]
1✔
1010
        if gap > max_gap_size:
1✔
1011
            max_gap_size = gap
1✔
1012
            max_gap_index = i
1✔
1013

1014
    # Check if filtering should occur:
1015
    # 1. Gap must be larger than min_gap
1016
    # 2. The minimum value must be <= max_subassembly_start
1017
    if max_gap_size > min_gap and max_gap_index >= 0:
1✔
1018
        min_value = values[0]
1✔
1019
        if min_value <= max_subassembly_start:
1✔
1020
            threshold = values[max_gap_index + 1]
1✔
1021
            return [(v, item) for v, item in sorted_items if v >= threshold]
1✔
1022

1023
    return items
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc