• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bramp / build-along / 19996847631

07 Dec 2025 01:07AM UTC coverage: 90.333%. Remained the same
19996847631

push

github

bramp
Refactor NewBagClassifier: move bag_number discovery to build time

- Remove bag_number_candidate from _NewBagScore (pre-assignment)
- Keep has_bag_number boolean for scoring bonus (+0.2)
- Rename _find_bag_number_in_cluster to _has_bag_number_in_cluster (returns bool)
- Add _find_and_build_bag_number() called at build time
- Update build() to discover bag number dynamically

37 of 39 new or added lines in 3 files covered. (94.87%)

21 existing lines in 3 files now uncovered.

10541 of 11669 relevant lines covered (90.33%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.65
/src/build_a_long/pdf_extract/classifier/steps/step_classifier.py
1
"""
2
Step classifier.
3

4
Purpose
5
-------
6
Identify complete Step structures by combining step_number, parts_list, and diagram
7
elements. A Step represents a single building instruction comprising:
8
- A StepNumber label
9
- An optional PartsList (the parts needed for this step)
10
- A Diagram (the main instruction graphic showing what to build)
11

12
We look for step_numbers and attempt to pair them with nearby parts_lists and
13
identify the appropriate diagram region for each step.
14

15
Debugging
16
---------
17
Set environment variables to aid investigation without code changes:
18

19
- LOG_LEVEL=DEBUG
20
    Enables DEBUG-level logging (if not already configured by caller).
21
"""
22

23
import logging
1✔
24

25
import numpy as np
1✔
26
from scipy.optimize import linear_sum_assignment
1✔
27

28
from build_a_long.pdf_extract.classifier.candidate import Candidate
1✔
29
from build_a_long.pdf_extract.classifier.classification_result import (
1✔
30
    CandidateFailedError,
31
    ClassificationResult,
32
)
33
from build_a_long.pdf_extract.classifier.label_classifier import (
1✔
34
    LabelClassifier,
35
)
36
from build_a_long.pdf_extract.classifier.parts.parts_list_classifier import (
1✔
37
    _PartsListScore,
38
)
39
from build_a_long.pdf_extract.classifier.score import (
1✔
40
    Score,
41
    Weight,
42
    find_best_scoring,
43
)
44
from build_a_long.pdf_extract.classifier.text import (
1✔
45
    extract_step_number_value,
46
)
47
from build_a_long.pdf_extract.extractor.bbox import BBox, filter_overlapping
1✔
48
from build_a_long.pdf_extract.extractor.lego_page_elements import (
1✔
49
    Arrow,
50
    Diagram,
51
    PartsList,
52
    RotationSymbol,
53
    Step,
54
    StepNumber,
55
    SubAssembly,
56
)
57
from build_a_long.pdf_extract.extractor.page_blocks import Text
1✔
58

59
log = logging.getLogger(__name__)
1✔
60

61

62
class _StepScore(Score):
1✔
63
    """Internal score representation for step classification."""
64

65
    step_number_candidate: Candidate
66
    """The step number candidate this step is associated with."""
1✔
67

68
    parts_list_candidate: Candidate | None
69
    """The parts list candidate paired with this step (if any)."""
1✔
70

71
    has_parts_list: bool
72
    """Whether this step has an associated parts list."""
1✔
73

74
    step_proximity_score: float
75
    """Score based on proximity to the PartsList above (0.0-1.0).
1✔
76
    1.0 for closest proximity, 0.0 if very far. 0.0 if no parts list."""
77

78
    step_alignment_score: float
79
    """Score based on left-edge alignment with PartsList above (0.0-1.0).
1✔
80
    1.0 is perfect alignment, 0.0 is very misaligned. 0.0 if no parts list."""
81

82
    def score(self) -> Weight:
1✔
83
        """Return the overall pairing score."""
84
        return self.overall_score()
×
85

86
    def overall_score(self) -> float:
1✔
87
        """Calculate overall quality score based on parts list pairing.
88

89
        Steps with a parts_list are given a bonus to prefer them over
90
        steps without parts_list. Diagrams are found at build time, not
91
        during scoring, to allow rotation symbols to claim small images first.
92
        """
93
        if self.has_parts_list:
1✔
94
            # Base score for having parts_list + proximity/alignment bonus
95
            parts_list_bonus = 0.5
1✔
96
            pairing_score = (
1✔
97
                self.step_proximity_score + self.step_alignment_score
98
            ) / 2.0
99
            return parts_list_bonus + 0.5 * pairing_score
1✔
100
        return 0.3  # Lower base score for steps without parts list
1✔
101

102
    def sort_key(self) -> tuple[float, int]:
1✔
103
        """Return a tuple for sorting candidates.
104

105
        We prefer:
106
        1. Higher overall scores (better StepNumber-PartsList-Diagram match)
107
        2. Lower step number values (to break ties and maintain order)
108
        """
109
        # Extract step number value from candidate's source block
110
        step_num_candidate = self.step_number_candidate
1✔
111

112
        # Assume single source block for step number
113
        if step_num_candidate.source_blocks and isinstance(
1✔
114
            step_num_candidate.source_blocks[0], Text
115
        ):
116
            text_block = step_num_candidate.source_blocks[0]
1✔
117
            step_value = extract_step_number_value(text_block.text)
1✔
118
            if step_value is not None:
1✔
119
                return (-self.overall_score(), step_value)
1✔
120

121
        return (-self.overall_score(), 0)  # Fallback if value cannot be extracted
×
122

123

124
class StepClassifier(LabelClassifier):
1✔
125
    """Classifier for complete Step structures."""
126

127
    output = "step"
1✔
128
    requires = frozenset(
1✔
129
        {"step_number", "parts_list", "diagram", "rotation_symbol", "subassembly"}
130
    )
131

132
    def _score(self, result: ClassificationResult) -> None:
1✔
133
        """Score step pairings and create candidates."""
134
        page_data = result.page_data
1✔
135

136
        # Get step number and parts list candidates (not constructed elements)
137
        step_number_candidates = result.get_scored_candidates(
1✔
138
            "step_number", valid_only=False, exclude_failed=True
139
        )
140

141
        if not step_number_candidates:
1✔
142
            return
1✔
143

144
        # Get parts_list candidates
145
        parts_list_candidates = result.get_scored_candidates(
1✔
146
            "parts_list",
147
            valid_only=False,
148
            exclude_failed=True,
149
        )
150

151
        log.debug(
1✔
152
            "[step] page=%s step_candidates=%d parts_list_candidates=%d",
153
            page_data.page_number,
154
            len(step_number_candidates),
155
            len(parts_list_candidates),
156
        )
157

158
        # Create all possible Step candidates for pairings (without diagrams initially)
159
        all_candidates: list[Candidate] = []
1✔
160
        for step_candidate in step_number_candidates:
1✔
161
            # Create candidates for this StepNumber paired with each PartsList
162
            for parts_list_candidate in parts_list_candidates:
1✔
163
                candidate = self._create_step_candidate(
1✔
164
                    step_candidate, parts_list_candidate, result
165
                )
166
                if candidate:
1✔
167
                    all_candidates.append(candidate)
1✔
168

169
            # Also create a candidate with no PartsList (fallback)
170
            candidate = self._create_step_candidate(step_candidate, None, result)
1✔
171
            if candidate:
1✔
172
                all_candidates.append(candidate)
1✔
173

174
        # Greedily select the best candidates (deduplication)
175
        # This will assign diagrams as part of the selection process
176
        deduplicated_candidates = self._deduplicate_and_assign_diagrams(
1✔
177
            all_candidates, result
178
        )
179

180
        # Add the deduplicated candidates to the result
181
        for candidate in deduplicated_candidates:
1✔
182
            result.add_candidate(candidate)
1✔
183

184
        log.debug(
1✔
185
            "[step] Created %d deduplicated step candidates (from %d possibilities)",
186
            len(deduplicated_candidates),
187
            len(all_candidates),
188
        )
189

190
    def build(self, candidate: Candidate, result: ClassificationResult) -> Step:
1✔
191
        """Construct a Step element from a single candidate."""
192
        score = candidate.score_details
1✔
193
        assert isinstance(score, _StepScore)
1✔
194

195
        # Validate and extract step number from parent candidate
196
        step_num_candidate = score.step_number_candidate
1✔
197

198
        step_num_elem = result.build(step_num_candidate)
1✔
199
        assert isinstance(step_num_elem, StepNumber)
1✔
200
        step_num = step_num_elem
1✔
201

202
        # Validate and extract parts list from parent candidate (if present)
203
        parts_list = None
1✔
204
        if score.parts_list_candidate:
1✔
205
            parts_list_candidate = score.parts_list_candidate
1✔
206
            parts_list_elem = result.build(parts_list_candidate)
1✔
207
            assert isinstance(parts_list_elem, PartsList)
1✔
208
            parts_list = parts_list_elem
1✔
209

210
        # Find and build the diagram for this step first
211
        diagram = self._find_and_build_diagram_for_step(step_num, parts_list, result)
1✔
212

213
        # Compute the step's bbox (step_num + parts_list + diagram)
214
        page_bbox = result.page_data.bbox
1✔
215
        step_bbox = self._compute_step_bbox(step_num, parts_list, diagram, page_bbox)
1✔
216

217
        # Get arrows for this step (from subassemblies and other sources)
218
        arrows = self._get_arrows_for_step(step_num, diagram, result)
1✔
219

220
        # Get subassemblies for this step
221
        subassemblies = self._get_subassemblies_for_step(step_num, diagram, result)
1✔
222

223
        # Build Step (rotation_symbol will be assigned later)
224
        return Step(
1✔
225
            bbox=step_bbox,
226
            step_number=step_num,
227
            parts_list=parts_list,
228
            diagram=diagram,
229
            rotation_symbol=None,
230
            arrows=arrows,
231
            subassemblies=subassemblies,
232
        )
233

234
    def _find_and_build_diagram_for_step(
1✔
235
        self,
236
        step_num: StepNumber,
237
        parts_list: PartsList | None,
238
        result: ClassificationResult,
239
    ) -> Diagram | None:
240
        """Find and build the best diagram for this step.
241

242
        This is called at build time, after rotation symbols have been built,
243
        so they've already claimed any small images they need. This ensures
244
        the diagram doesn't incorrectly cluster rotation symbol images.
245

246
        Args:
247
            step_num: The built step number element
248
            parts_list: The built parts list element (if any)
249
            result: Classification result containing diagram candidates
250

251
        Returns:
252
            The built Diagram element, or None if no suitable diagram found
253
        """
254
        # Get all non-failed, non-constructed diagram candidates
255
        diagram_candidates = result.get_scored_candidates(
1✔
256
            "diagram", valid_only=False, exclude_failed=True
257
        )
258

259
        # Filter to only candidates that haven't been built yet
260
        available_candidates = [c for c in diagram_candidates if c.constructed is None]
1✔
261

262
        if not available_candidates:
1✔
263
            log.debug(
1✔
264
                "[step] No diagram candidates available for step %d",
265
                step_num.value,
266
            )
267
            return None
1✔
268

269
        # Score each candidate based on position relative to step
270
        step_bbox = step_num.bbox
1✔
271
        best_candidate = None
1✔
272
        best_score = -float("inf")
1✔
273

274
        for candidate in available_candidates:
1✔
275
            score = self._score_step_diagram_pair(step_bbox, candidate.bbox)
1✔
276

277
            log.debug(
1✔
278
                "[step] Diagram candidate at %s for step %d: score=%.2f",
279
                candidate.bbox,
280
                step_num.value,
281
                score,
282
            )
283

284
            if score > best_score:
1✔
285
                best_score = score
1✔
286
                best_candidate = candidate
1✔
287

288
        if best_candidate is None or best_score < 0.2:
1✔
289
            log.debug(
×
290
                "[step] No suitable diagram found for step %d (best_score=%.2f)",
291
                step_num.value,
292
                best_score,
293
            )
294
            return None
×
295

296
        # Build the diagram
297
        try:
1✔
298
            diagram_elem = result.build(best_candidate)
1✔
299
            assert isinstance(diagram_elem, Diagram)
1✔
300
            log.debug(
1✔
301
                "[step] Built diagram at %s for step %d (score=%.2f)",
302
                diagram_elem.bbox,
303
                step_num.value,
304
                best_score,
305
            )
306
            return diagram_elem
1✔
307
        except CandidateFailedError as e:
×
308
            log.debug(
×
309
                "[step] Failed to build diagram for step %d: %s",
310
                step_num.value,
311
                e,
312
            )
313
            return None
×
314

315
    def _create_step_candidate(
1✔
316
        self,
317
        step_candidate: Candidate,
318
        parts_list_candidate: Candidate | None,
319
        result: ClassificationResult,
320
    ) -> Candidate | None:
321
        """Create a Step candidate (without diagram assignment).
322

323
        Diagrams are found at build time, not during scoring, to allow
324
        rotation symbols to claim small images first.
325

326
        Args:
327
            step_candidate: The StepNumber candidate for this step
328
            parts_list_candidate: The PartsList candidate to pair with (or None)
329
            result: Classification result
330

331
        Returns:
332
            The created Candidate with score but no construction
333
        """
334
        ABOVE_EPS = 2.0  # Small epsilon for "above" check
1✔
335
        ALIGNMENT_THRESHOLD_MULTIPLIER = 1.0  # Max horizontal offset
1✔
336
        DISTANCE_THRESHOLD_MULTIPLIER = 1.0  # Max vertical distance
1✔
337

338
        step_bbox = step_candidate.bbox
1✔
339
        parts_list_bbox = parts_list_candidate.bbox if parts_list_candidate else None
1✔
340

341
        # Calculate pairing scores if there's a parts_list above the step
342
        proximity_score = 0.0
1✔
343
        alignment_score = 0.0
1✔
344

345
        if (
1✔
346
            parts_list_bbox is not None
347
            and parts_list_bbox.y1 <= step_bbox.y0 + ABOVE_EPS
348
        ):
349
            # Calculate distance (how far apart vertically)
350
            distance = step_bbox.y0 - parts_list_bbox.y1
1✔
351

352
            # Calculate proximity score
353
            max_distance = step_bbox.height * DISTANCE_THRESHOLD_MULTIPLIER
1✔
354
            if max_distance > 0:
1✔
355
                proximity_score = max(0.0, 1.0 - (distance / max_distance))
1✔
356

357
            # Calculate alignment score (how well left edges align)
358
            max_alignment_diff = step_bbox.width * ALIGNMENT_THRESHOLD_MULTIPLIER
1✔
359
            left_diff = abs(parts_list_bbox.x0 - step_bbox.x0)
1✔
360
            if max_alignment_diff > 0:
1✔
361
                alignment_score = max(0.0, 1.0 - (left_diff / max_alignment_diff))
1✔
362

363
        # Create score object with candidate references
364
        # Diagrams are found at build time, not during scoring
365
        score = _StepScore(
1✔
366
            step_number_candidate=step_candidate,
367
            parts_list_candidate=parts_list_candidate,
368
            has_parts_list=parts_list_candidate is not None,
369
            step_proximity_score=proximity_score,
370
            step_alignment_score=alignment_score,
371
        )
372

373
        # Calculate combined bbox for the candidate (without diagram)
374
        combined_bbox = step_bbox
1✔
375
        if parts_list_bbox:
1✔
376
            combined_bbox = BBox.union(combined_bbox, parts_list_bbox)
1✔
377

378
        # Create candidate
379
        return Candidate(
1✔
380
            bbox=combined_bbox,
381
            label="step",
382
            score=score.overall_score(),
383
            score_details=score,
384
            source_blocks=[],
385
        )
386

387
    def _score_step_diagram_pair(
1✔
388
        self,
389
        step_bbox: BBox,
390
        diagram_bbox: BBox,
391
    ) -> float:
392
        """Score how well a diagram matches a step.
393

394
        Diagrams are typically positioned to the right of and/or below the step
395
        number. This method scores based on:
396
        - Horizontal position: prefer diagrams to the right, penalize left
397
        - Vertical position: prefer diagrams below the step header
398
        - Distance: closer is better
399

400
        Args:
401
            step_bbox: The step number bounding box
402
            diagram_bbox: The diagram bounding box to score
403

404
        Returns:
405
            Score between 0.0 and 1.0 (higher is better match)
406
        """
407
        # Reference point: bottom-right of step number
408
        ref_x = step_bbox.x1
1✔
409
        ref_y = step_bbox.y1
1✔
410

411
        # TODO Move all these constants into config, or make them adaptive?
412

413
        # Horizontal score
414
        # Diagrams to the right are preferred, but allow some overlap
415
        x_offset = diagram_bbox.x0 - ref_x
1✔
416

417
        if x_offset >= -50:
1✔
418
            # Diagram starts to the right or slightly overlapping - good
419
            # Score decreases slightly with distance to the right
420
            x_score = max(0.5, 1.0 - abs(x_offset) / 400.0)
1✔
421
        elif x_offset >= -200:
1✔
422
            # Diagram is moderately to the left - acceptable
423
            x_score = 0.3 + 0.2 * (1.0 + x_offset / 200.0)
1✔
424
        else:
425
            # Diagram is far to the left - poor match
426
            x_score = max(0.1, 0.3 + x_offset / 400.0)
1✔
427

428
        # Vertical score
429
        # Diagrams below the step header are preferred
430
        y_offset = diagram_bbox.y0 - ref_y
1✔
431

432
        if y_offset >= -30:
1✔
433
            # Diagram starts below or slightly overlapping - good
434
            # Score decreases with vertical distance
435
            y_score = max(0.3, 1.0 - abs(y_offset) / 300.0)
1✔
436
        elif y_offset >= -100:
1✔
437
            # Diagram is moderately above - less good but acceptable
438
            y_score = 0.2 + 0.3 * (1.0 + y_offset / 100.0)
1✔
439
        else:
440
            # Diagram is far above - poor match
441
            y_score = max(0.05, 0.2 + y_offset / 300.0)
1✔
442

443
        # Combined score - weight both dimensions equally
444
        score = 0.5 * x_score + 0.5 * y_score
1✔
445

446
        return score
1✔
447

448
    def _get_rotation_symbol_for_step(
1✔
449
        self,
450
        step_bbox: BBox,
451
        result: ClassificationResult,
452
    ) -> RotationSymbol | None:
453
        """Find an already-built rotation symbol within this step's area.
454

455
        Rotation symbols are built by PageClassifier before steps are processed.
456
        This method finds the already-built rotation symbol that falls within
457
        the step's bounding box.
458

459
        Args:
460
            step_bbox: The step's bounding box (including step_num, parts_list,
461
                and diagram)
462
            result: Classification result containing rotation symbol candidates
463

464
        Returns:
465
            Single RotationSymbol element for this step, or None if not found
466
        """
467
        # Get rotation_symbol candidates - they should already be built
468
        # by PageClassifier. Use valid_only=True to only get successfully
469
        # constructed rotation symbols.
470
        rotation_symbol_candidates = result.get_scored_candidates(
×
471
            "rotation_symbol", valid_only=True
472
        )
473

474
        log.debug(
×
475
            "[step] Looking for rotation symbols in step bbox %s, "
476
            "found %d built candidates",
477
            step_bbox,
478
            len(rotation_symbol_candidates),
479
        )
480

481
        if not rotation_symbol_candidates:
×
482
            return None
×
483

484
        # Find best-scoring rotation symbol overlapping the step's bbox
NEW
485
        overlapping_candidates = filter_overlapping(
×
486
            rotation_symbol_candidates, step_bbox
487
        )
NEW
488
        best_candidate = find_best_scoring(overlapping_candidates)
×
489

490
        if best_candidate and best_candidate.constructed is not None:
×
491
            rotation_symbol = best_candidate.constructed
×
492
            assert isinstance(rotation_symbol, RotationSymbol)
×
493
            log.debug(
×
494
                "[step] Found rotation symbol at %s (score=%.2f)",
495
                rotation_symbol.bbox,
496
                best_candidate.score,
497
            )
498
            return rotation_symbol
×
499

500
        log.debug("[step] No rotation symbol found in step bbox %s", step_bbox)
×
501
        return None
×
502

503
    def _get_arrows_for_step(
1✔
504
        self,
505
        step_num: StepNumber,
506
        diagram: Diagram | None,
507
        result: ClassificationResult,
508
    ) -> list[Arrow]:
509
        """Find arrows associated with this step.
510

511
        Looks for arrow candidates that are positioned near the step's diagram
512
        or step number. Typically these are arrows pointing from subassembly
513
        callout boxes to the main diagram.
514

515
        Args:
516
            step_num: The step number element
517
            diagram: The diagram element (if any)
518
            result: Classification result containing arrow candidates
519

520
        Returns:
521
            List of Arrow elements for this step
522
        """
523
        arrow_candidates = result.get_scored_candidates(
1✔
524
            "arrow", valid_only=False, exclude_failed=True
525
        )
526

527
        log.debug(
1✔
528
            "[step] Looking for arrows for step %d, found %d candidates",
529
            step_num.value,
530
            len(arrow_candidates),
531
        )
532

533
        if not arrow_candidates:
1✔
534
            return []
1✔
535

536
        # Determine search region: prefer diagram area, fallback to step area
537
        search_bbox = diagram.bbox if diagram else step_num.bbox
1✔
538

539
        # Expand search region to catch arrows near the diagram
540
        # Use a larger margin than rotation symbols since arrows can extend further
541
        search_region = search_bbox.expand(100.0)
1✔
542

543
        log.debug(
1✔
544
            "[step] Arrow search region for step %d: %s",
545
            step_num.value,
546
            search_region,
547
        )
548

549
        # Find arrows within or overlapping the search region
550
        arrows: list[Arrow] = []
1✔
551
        overlapping_candidates = filter_overlapping(arrow_candidates, search_region)
1✔
552

553
        for candidate in overlapping_candidates:
1✔
554
            log.debug(
1✔
555
                "[step]   Arrow candidate at %s, overlaps=True, score=%.2f",
556
                candidate.bbox,
557
                candidate.score,
558
            )
559
            try:
1✔
560
                arrow = result.build(candidate)
1✔
561
                assert isinstance(arrow, Arrow)
1✔
562
                arrows.append(arrow)
1✔
563
            except CandidateFailedError:
1✔
564
                # Arrow lost conflict to another arrow (they share source blocks)
565
                # This is expected when multiple arrows overlap - skip it
566
                log.debug(
1✔
567
                    "[step]   Arrow candidate at %s failed (conflict), skipping",
568
                    candidate.bbox,
569
                )
570
                continue
1✔
571

572
        log.debug(
1✔
573
            "[step] Found %d arrows for step %d",
574
            len(arrows),
575
            step_num.value,
576
        )
577
        return arrows
1✔
578

579
    def _get_subassemblies_for_step(
1✔
580
        self,
581
        step_num: StepNumber,
582
        diagram: Diagram | None,
583
        result: ClassificationResult,
584
    ) -> list[SubAssembly]:
585
        """Find subassemblies associated with this step.
586

587
        Looks for subassembly candidates that are positioned near the step's
588
        diagram or step number. SubAssemblies are callout boxes showing
589
        sub-assemblies.
590

591
        Args:
592
            step_num: The step number element
593
            diagram: The diagram element (if any)
594
            result: Classification result containing subassembly candidates
595

596
        Returns:
597
            List of SubAssembly elements for this step
598
        """
599
        subassembly_candidates = result.get_scored_candidates(
1✔
600
            "subassembly", valid_only=False, exclude_failed=True
601
        )
602

603
        log.debug(
1✔
604
            "[step] Looking for subassemblies for step %d, found %d candidates",
605
            step_num.value,
606
            len(subassembly_candidates),
607
        )
608

609
        if not subassembly_candidates:
1✔
610
            return []
1✔
611

612
        # Determine search region: prefer diagram area, fallback to step area
613
        search_bbox = diagram.bbox if diagram else step_num.bbox
1✔
614

615
        # Expand search region to catch subassemblies near the diagram
616
        # Use a larger margin since subassemblies can be positioned further from
617
        # the main diagram
618
        search_region = search_bbox.expand(150.0)
1✔
619

620
        log.debug(
1✔
621
            "[step] SubAssembly search region for step %d: %s",
622
            step_num.value,
623
            search_region,
624
        )
625

626
        # Find subassemblies within or overlapping the search region
627
        subassemblies: list[SubAssembly] = []
1✔
628
        overlapping_candidates = filter_overlapping(
1✔
629
            subassembly_candidates, search_region
630
        )
631

632
        for candidate in overlapping_candidates:
1✔
633
            log.debug(
1✔
634
                "[step]   SubAssembly candidate at %s, overlaps=True, score=%.2f",
635
                candidate.bbox,
636
                candidate.score,
637
            )
638
            try:
1✔
639
                subassembly = result.build(candidate)
1✔
640
                assert isinstance(subassembly, SubAssembly)
1✔
641
                subassemblies.append(subassembly)
1✔
642
            except Exception as e:
1✔
643
                log.debug(
1✔
644
                    "[step]   Failed to build subassembly at %s: %s",
645
                    candidate.bbox,
646
                    e,
647
                )
648

649
        log.debug(
1✔
650
            "[step] Found %d subassemblies for step %d",
651
            len(subassemblies),
652
            step_num.value,
653
        )
654
        return subassemblies
1✔
655

656
    def _compute_step_bbox(
1✔
657
        self,
658
        step_num: StepNumber,
659
        parts_list: PartsList | None,
660
        diagram: Diagram | None,
661
        page_bbox: BBox,
662
    ) -> BBox:
663
        """Compute the overall bounding box for the Step.
664

665
        This encompasses the step number, parts list (if any), and diagram (if any).
666
        The result is clipped to the page bounds to handle elements that extend
667
        slightly off-page (e.g., arrows in diagrams).
668

669
        Args:
670
            step_num: The step number element
671
            parts_list: The parts list (if any)
672
            diagram: The diagram element (if any)
673
            page_bbox: The page bounding box to clip to
674

675
        Returns:
676
            Combined bounding box, clipped to page bounds
677
        """
678
        bboxes = [step_num.bbox]
1✔
679
        if parts_list:
1✔
680
            bboxes.append(parts_list.bbox)
1✔
681
        if diagram:
1✔
682
            bboxes.append(diagram.bbox)
1✔
683

684
        return BBox.union_all(bboxes).clip_to(page_bbox)
1✔
685

686
    def _deduplicate_and_assign_diagrams(
1✔
687
        self, candidates: list[Candidate], result: ClassificationResult
688
    ) -> list[Candidate]:
689
        """Select the best Step candidates, ensuring each step number is unique.
690

691
        Diagrams are found at build time, not during scoring, to allow
692
        rotation symbols to claim small images first.
693

694
        Args:
695
            candidates: All possible Step candidates
696
            result: Classification result (unused, kept for API compatibility)
697

698
        Returns:
699
            Deduplicated list of Step candidates (one per step number value)
700
        """
701
        # First, deduplicate candidates by step number value
702
        # Pick the best candidate for each unique step number
703
        best_by_step_value: dict[int, Candidate] = {}
1✔
704

705
        for candidate in candidates:
1✔
706
            assert isinstance(candidate.score_details, _StepScore)
1✔
707
            score = candidate.score_details
1✔
708

709
            # Extract step number value
710
            step_num_candidate = score.step_number_candidate
1✔
711
            if not step_num_candidate.source_blocks:
1✔
712
                continue
×
713
            text_block = step_num_candidate.source_blocks[0]
1✔
714
            if not isinstance(text_block, Text):
1✔
715
                continue
×
716

717
            step_value = extract_step_number_value(text_block.text)
1✔
718
            if step_value is None:
1✔
719
                continue
×
720

721
            # Keep the best candidate for each step value
722
            if step_value not in best_by_step_value:
1✔
723
                best_by_step_value[step_value] = candidate
1✔
724
            else:
725
                existing = best_by_step_value[step_value]
1✔
726
                if candidate.score > existing.score:
1✔
727
                    best_by_step_value[step_value] = candidate
1✔
728

729
        # Get unique step candidates
730
        unique_step_candidates = list(best_by_step_value.values())
1✔
731

732
        if not unique_step_candidates:
1✔
733
            return []
×
734

735
        # Build final candidates ensuring parts list uniqueness
736
        selected: list[Candidate] = []
1✔
737
        used_parts_list_ids: set[int] = set()
1✔
738

739
        for candidate in unique_step_candidates:
1✔
740
            assert isinstance(candidate.score_details, _StepScore)
1✔
741
            score = candidate.score_details
1✔
742

743
            # Check parts list uniqueness
744
            parts_list_candidate = score.parts_list_candidate
1✔
745
            if parts_list_candidate is not None:
1✔
746
                has_parts = False
1✔
747
                if isinstance(parts_list_candidate.score_details, _PartsListScore):
1✔
748
                    has_parts = (
1✔
749
                        len(parts_list_candidate.score_details.part_candidates) > 0
750
                    )
751

752
                if has_parts:
1✔
753
                    parts_list_id = id(parts_list_candidate)
1✔
754
                    if parts_list_id in used_parts_list_ids:
1✔
755
                        # Use None for parts list if already used
756
                        parts_list_candidate = None
1✔
757
                    else:
758
                        used_parts_list_ids.add(parts_list_id)
1✔
759

760
            # Create updated score if parts_list changed
761
            if parts_list_candidate != score.parts_list_candidate:
1✔
762
                updated_score = _StepScore(
1✔
763
                    step_number_candidate=score.step_number_candidate,
764
                    parts_list_candidate=parts_list_candidate,
765
                    has_parts_list=parts_list_candidate is not None,
766
                    step_proximity_score=score.step_proximity_score,
767
                    step_alignment_score=score.step_alignment_score,
768
                )
769
                candidate = Candidate(
1✔
770
                    bbox=candidate.bbox,
771
                    label=candidate.label,
772
                    score=updated_score.overall_score(),
773
                    score_details=updated_score,
774
                    source_blocks=candidate.source_blocks,
775
                )
776

777
            selected.append(candidate)
1✔
778

779
            # Log selection
780
            text_block = score.step_number_candidate.source_blocks[0]
1✔
781
            assert isinstance(text_block, Text)
1✔
782
            step_value = extract_step_number_value(text_block.text)
1✔
783
            log.debug(
1✔
784
                "[step] Selected step %d (parts_list=%s, score=%.2f)",
785
                step_value or 0,
786
                "yes" if parts_list_candidate is not None else "no",
787
                candidate.score,
788
            )
789

790
        return selected
1✔
791

792

793
def assign_rotation_symbols_to_steps(
1✔
794
    steps: list[Step],
795
    rotation_symbols: list[RotationSymbol],
796
    max_distance: float = 300.0,
797
) -> None:
798
    """Assign rotation symbols to steps using Hungarian algorithm.
799

800
    Uses optimal bipartite matching to pair rotation symbols with steps based on
801
    minimum distance from the rotation symbol to the step's diagram (or step bbox
802
    center if no diagram).
803

804
    This function mutates the Step objects in place, setting their rotation_symbol
805
    attribute.
806

807
    Args:
808
        steps: List of Step objects to assign rotation symbols to
809
        rotation_symbols: List of RotationSymbol objects to assign
810
        max_distance: Maximum distance for a valid assignment. Pairs with distance
811
            greater than this will not be matched.
812
    """
813
    if not steps or not rotation_symbols:
1✔
814
        log.debug(
1✔
815
            "[step] No rotation symbol assignment needed: "
816
            "steps=%d, rotation_symbols=%d",
817
            len(steps),
818
            len(rotation_symbols),
819
        )
820
        return
1✔
821

822
    n_steps = len(steps)
1✔
823
    n_symbols = len(rotation_symbols)
1✔
824

825
    log.debug(
1✔
826
        "[step] Running Hungarian matching: %d steps, %d rotation symbols",
827
        n_steps,
828
        n_symbols,
829
    )
830

831
    # Build cost matrix: rows = rotation symbols, cols = steps
832
    # Cost = distance from rotation symbol center to step's diagram center
833
    # (or step bbox center if no diagram)
834
    cost_matrix = np.zeros((n_symbols, n_steps))
1✔
835

836
    for i, rs in enumerate(rotation_symbols):
1✔
837
        rs_center = rs.bbox.center
1✔
838
        for j, step in enumerate(steps):
1✔
839
            # Use diagram center if available, otherwise step bbox center
840
            if step.diagram:
1✔
841
                target_center = step.diagram.bbox.center
1✔
842
            else:
843
                target_center = step.bbox.center
×
844

845
            # Euclidean distance
846
            distance = (
1✔
847
                (rs_center[0] - target_center[0]) ** 2
848
                + (rs_center[1] - target_center[1]) ** 2
849
            ) ** 0.5
850

851
            cost_matrix[i, j] = distance
1✔
852
            log.debug(
1✔
853
                "[step]   Distance from rotation_symbol[%d] at %s to step[%d]: %.1f",
854
                i,
855
                rs.bbox,
856
                step.step_number.value,
857
                distance,
858
            )
859

860
    # Apply max_distance threshold by setting costs above threshold to a high value
861
    # This prevents assignments that are too far apart
862
    high_cost = max_distance * 10  # Arbitrary large value
1✔
863
    cost_matrix_thresholded = np.where(
1✔
864
        cost_matrix > max_distance, high_cost, cost_matrix
865
    )
866

867
    # Run Hungarian algorithm
868
    row_indices, col_indices = linear_sum_assignment(cost_matrix_thresholded)
1✔
869

870
    # Assign rotation symbols to steps based on the matching
871
    for row_idx, col_idx in zip(row_indices, col_indices, strict=True):
1✔
872
        # Check if this assignment is within the max_distance threshold
873
        if cost_matrix[row_idx, col_idx] <= max_distance:
1✔
874
            rs = rotation_symbols[row_idx]
1✔
875
            step = steps[col_idx]
1✔
876
            step.rotation_symbol = rs
1✔
877
            # Expand step bbox to include the rotation symbol
878
            step.bbox = step.bbox.union(rs.bbox)
1✔
879
            log.debug(
1✔
880
                "[step] Assigned rotation symbol at %s to step %d "
881
                "(distance=%.1f, new step bbox=%s)",
882
                rs.bbox,
883
                step.step_number.value,
884
                cost_matrix[row_idx, col_idx],
885
                step.bbox,
886
            )
887
        else:
888
            log.debug(
×
889
                "[step] Skipped assignment: rotation_symbol[%d] to step[%d] "
890
                "distance=%.1f > max_distance=%.1f",
891
                row_idx,
892
                col_indices[row_idx] if row_idx < len(col_indices) else -1,
893
                cost_matrix[row_idx, col_idx],
894
                max_distance,
895
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc