• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bramp / build-along / 19996566426

07 Dec 2025 12:42AM UTC coverage: 90.333% (-0.2%) from 90.506%
19996566426

push

github

bramp
Refactor rotation symbol assignment to use Hungarian algorithm

- RotationSymbolClassifier now claims source_blocks (Drawing elements)
- Removed rotation_symbol from composite_labels in candidate.py
- Build rotation symbols in PageClassifier before steps
- Steps are built without rotation symbols initially
- Hungarian algorithm (scipy.optimize.linear_sum_assignment) pairs
  rotation symbols to steps based on distance to diagram center
- Step bbox is expanded to include assigned rotation symbol
- Updated golden files to include rotation symbols in steps
- Clarified build_all() docstring in LabelClassifier

91 of 101 new or added lines in 8 files covered. (90.1%)

16 existing lines in 2 files now uncovered.

10522 of 11648 relevant lines covered (90.33%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.81
/src/build_a_long/pdf_extract/classifier/steps/step_classifier.py
1
"""
2
Step classifier.
3

4
Purpose
5
-------
6
Identify complete Step structures by combining step_number, parts_list, and diagram
7
elements. A Step represents a single building instruction comprising:
8
- A StepNumber label
9
- An optional PartsList (the parts needed for this step)
10
- A Diagram (the main instruction graphic showing what to build)
11

12
We look for step_numbers and attempt to pair them with nearby parts_lists and
13
identify the appropriate diagram region for each step.
14

15
Debugging
16
---------
17
Set environment variables to aid investigation without code changes:
18

19
- LOG_LEVEL=DEBUG
20
    Enables DEBUG-level logging (if not already configured by caller).
21
"""
22

23
import logging
1✔
24

25
import numpy as np
1✔
26
from scipy.optimize import linear_sum_assignment
1✔
27

28
from build_a_long.pdf_extract.classifier.candidate import Candidate
1✔
29
from build_a_long.pdf_extract.classifier.classification_result import (
1✔
30
    CandidateFailedError,
31
    ClassificationResult,
32
)
33
from build_a_long.pdf_extract.classifier.label_classifier import (
1✔
34
    LabelClassifier,
35
)
36
from build_a_long.pdf_extract.classifier.parts.parts_list_classifier import (
1✔
37
    _PartsListScore,
38
)
39
from build_a_long.pdf_extract.classifier.score import Score, Weight
1✔
40
from build_a_long.pdf_extract.classifier.text import (
1✔
41
    extract_step_number_value,
42
)
43
from build_a_long.pdf_extract.extractor.bbox import BBox
1✔
44
from build_a_long.pdf_extract.extractor.lego_page_elements import (
1✔
45
    Arrow,
46
    Diagram,
47
    PartsList,
48
    RotationSymbol,
49
    Step,
50
    StepNumber,
51
    SubAssembly,
52
)
53
from build_a_long.pdf_extract.extractor.page_blocks import Text
1✔
54

55
log = logging.getLogger(__name__)
1✔
56

57

58
class _StepScore(Score):
1✔
59
    """Internal score representation for step classification."""
60

61
    step_number_candidate: Candidate
62
    """The step number candidate this step is associated with."""
1✔
63

64
    parts_list_candidate: Candidate | None
65
    """The parts list candidate paired with this step (if any)."""
1✔
66

67
    has_parts_list: bool
68
    """Whether this step has an associated parts list."""
1✔
69

70
    step_proximity_score: float
71
    """Score based on proximity to the PartsList above (0.0-1.0).
1✔
72
    1.0 for closest proximity, 0.0 if very far. 0.0 if no parts list."""
73

74
    step_alignment_score: float
75
    """Score based on left-edge alignment with PartsList above (0.0-1.0).
1✔
76
    1.0 is perfect alignment, 0.0 is very misaligned. 0.0 if no parts list."""
77

78
    def score(self) -> Weight:
1✔
79
        """Return the overall pairing score."""
80
        return self.overall_score()
×
81

82
    def overall_score(self) -> float:
1✔
83
        """Calculate overall quality score based on parts list pairing.
84

85
        Steps with a parts_list are given a bonus to prefer them over
86
        steps without parts_list. Diagrams are found at build time, not
87
        during scoring, to allow rotation symbols to claim small images first.
88
        """
89
        if self.has_parts_list:
1✔
90
            # Base score for having parts_list + proximity/alignment bonus
91
            parts_list_bonus = 0.5
1✔
92
            pairing_score = (
1✔
93
                self.step_proximity_score + self.step_alignment_score
94
            ) / 2.0
95
            return parts_list_bonus + 0.5 * pairing_score
1✔
96
        return 0.3  # Lower base score for steps without parts list
1✔
97

98
    def sort_key(self) -> tuple[float, int]:
1✔
99
        """Return a tuple for sorting candidates.
100

101
        We prefer:
102
        1. Higher overall scores (better StepNumber-PartsList-Diagram match)
103
        2. Lower step number values (to break ties and maintain order)
104
        """
105
        # Extract step number value from candidate's source block
106
        step_num_candidate = self.step_number_candidate
1✔
107

108
        # Assume single source block for step number
109
        if step_num_candidate.source_blocks and isinstance(
1✔
110
            step_num_candidate.source_blocks[0], Text
111
        ):
112
            text_block = step_num_candidate.source_blocks[0]
1✔
113
            step_value = extract_step_number_value(text_block.text)
1✔
114
            if step_value is not None:
1✔
115
                return (-self.overall_score(), step_value)
1✔
116

117
        return (-self.overall_score(), 0)  # Fallback if value cannot be extracted
×
118

119

120
class StepClassifier(LabelClassifier):
1✔
121
    """Classifier for complete Step structures."""
122

123
    output = "step"
1✔
124
    requires = frozenset(
1✔
125
        {"step_number", "parts_list", "diagram", "rotation_symbol", "subassembly"}
126
    )
127

128
    def _score(self, result: ClassificationResult) -> None:
1✔
129
        """Score step pairings and create candidates."""
130
        page_data = result.page_data
1✔
131

132
        # Get step number and parts list candidates (not constructed elements)
133
        step_number_candidates = result.get_scored_candidates(
1✔
134
            "step_number", valid_only=False, exclude_failed=True
135
        )
136

137
        if not step_number_candidates:
1✔
138
            return
1✔
139

140
        # Get parts_list candidates
141
        parts_list_candidates = result.get_scored_candidates(
1✔
142
            "parts_list",
143
            valid_only=False,
144
            exclude_failed=True,
145
        )
146

147
        log.debug(
1✔
148
            "[step] page=%s step_candidates=%d parts_list_candidates=%d",
149
            page_data.page_number,
150
            len(step_number_candidates),
151
            len(parts_list_candidates),
152
        )
153

154
        # Create all possible Step candidates for pairings (without diagrams initially)
155
        all_candidates: list[Candidate] = []
1✔
156
        for step_candidate in step_number_candidates:
1✔
157
            # Create candidates for this StepNumber paired with each PartsList
158
            for parts_list_candidate in parts_list_candidates:
1✔
159
                candidate = self._create_step_candidate(
1✔
160
                    step_candidate, parts_list_candidate, result
161
                )
162
                if candidate:
1✔
163
                    all_candidates.append(candidate)
1✔
164

165
            # Also create a candidate with no PartsList (fallback)
166
            candidate = self._create_step_candidate(step_candidate, None, result)
1✔
167
            if candidate:
1✔
168
                all_candidates.append(candidate)
1✔
169

170
        # Greedily select the best candidates (deduplication)
171
        # This will assign diagrams as part of the selection process
172
        deduplicated_candidates = self._deduplicate_and_assign_diagrams(
1✔
173
            all_candidates, result
174
        )
175

176
        # Add the deduplicated candidates to the result
177
        for candidate in deduplicated_candidates:
1✔
178
            result.add_candidate(candidate)
1✔
179

180
        log.debug(
1✔
181
            "[step] Created %d deduplicated step candidates (from %d possibilities)",
182
            len(deduplicated_candidates),
183
            len(all_candidates),
184
        )
185

186
    def build(self, candidate: Candidate, result: ClassificationResult) -> Step:
1✔
187
        """Construct a Step element from a single candidate."""
188
        score = candidate.score_details
1✔
189
        assert isinstance(score, _StepScore)
1✔
190

191
        # Validate and extract step number from parent candidate
192
        step_num_candidate = score.step_number_candidate
1✔
193

194
        step_num_elem = result.build(step_num_candidate)
1✔
195
        assert isinstance(step_num_elem, StepNumber)
1✔
196
        step_num = step_num_elem
1✔
197

198
        # Validate and extract parts list from parent candidate (if present)
199
        parts_list = None
1✔
200
        if score.parts_list_candidate:
1✔
201
            parts_list_candidate = score.parts_list_candidate
1✔
202
            parts_list_elem = result.build(parts_list_candidate)
1✔
203
            assert isinstance(parts_list_elem, PartsList)
1✔
204
            parts_list = parts_list_elem
1✔
205

206
        # Find and build the diagram for this step first
207
        diagram = self._find_and_build_diagram_for_step(step_num, parts_list, result)
1✔
208

209
        # Compute the step's bbox (step_num + parts_list + diagram)
210
        page_bbox = result.page_data.bbox
1✔
211
        step_bbox = self._compute_step_bbox(step_num, parts_list, diagram, page_bbox)
1✔
212

213
        # Get arrows for this step (from subassemblies and other sources)
214
        arrows = self._get_arrows_for_step(step_num, diagram, result)
1✔
215

216
        # Get subassemblies for this step
217
        subassemblies = self._get_subassemblies_for_step(step_num, diagram, result)
1✔
218

219
        # Build Step (rotation_symbol will be assigned later)
220
        return Step(
1✔
221
            bbox=step_bbox,
222
            step_number=step_num,
223
            parts_list=parts_list,
224
            diagram=diagram,
225
            rotation_symbol=None,
226
            arrows=arrows,
227
            subassemblies=subassemblies,
228
        )
229

230
    def _find_and_build_diagram_for_step(
1✔
231
        self,
232
        step_num: StepNumber,
233
        parts_list: PartsList | None,
234
        result: ClassificationResult,
235
    ) -> Diagram | None:
236
        """Find and build the best diagram for this step.
237

238
        This is called at build time, after rotation symbols have been built,
239
        so they've already claimed any small images they need. This ensures
240
        the diagram doesn't incorrectly cluster rotation symbol images.
241

242
        Args:
243
            step_num: The built step number element
244
            parts_list: The built parts list element (if any)
245
            result: Classification result containing diagram candidates
246

247
        Returns:
248
            The built Diagram element, or None if no suitable diagram found
249
        """
250
        # Get all non-failed, non-constructed diagram candidates
251
        diagram_candidates = result.get_scored_candidates(
1✔
252
            "diagram", valid_only=False, exclude_failed=True
253
        )
254

255
        # Filter to only candidates that haven't been built yet
256
        available_candidates = [c for c in diagram_candidates if c.constructed is None]
1✔
257

258
        if not available_candidates:
1✔
259
            log.debug(
1✔
260
                "[step] No diagram candidates available for step %d",
261
                step_num.value,
262
            )
263
            return None
1✔
264

265
        # Score each candidate based on position relative to step
266
        step_bbox = step_num.bbox
1✔
267
        best_candidate = None
1✔
268
        best_score = -float("inf")
1✔
269

270
        for candidate in available_candidates:
1✔
271
            score = self._score_step_diagram_pair(step_bbox, candidate.bbox)
1✔
272

273
            log.debug(
1✔
274
                "[step] Diagram candidate at %s for step %d: score=%.2f",
275
                candidate.bbox,
276
                step_num.value,
277
                score,
278
            )
279

280
            if score > best_score:
1✔
281
                best_score = score
1✔
282
                best_candidate = candidate
1✔
283

284
        if best_candidate is None or best_score < 0.2:
1✔
285
            log.debug(
×
286
                "[step] No suitable diagram found for step %d (best_score=%.2f)",
287
                step_num.value,
288
                best_score,
289
            )
290
            return None
×
291

292
        # Build the diagram
293
        try:
1✔
294
            diagram_elem = result.build(best_candidate)
1✔
295
            assert isinstance(diagram_elem, Diagram)
1✔
296
            log.debug(
1✔
297
                "[step] Built diagram at %s for step %d (score=%.2f)",
298
                diagram_elem.bbox,
299
                step_num.value,
300
                best_score,
301
            )
302
            return diagram_elem
1✔
303
        except CandidateFailedError as e:
×
304
            log.debug(
×
305
                "[step] Failed to build diagram for step %d: %s",
306
                step_num.value,
307
                e,
308
            )
309
            return None
×
310

311
    def _create_step_candidate(
1✔
312
        self,
313
        step_candidate: Candidate,
314
        parts_list_candidate: Candidate | None,
315
        result: ClassificationResult,
316
    ) -> Candidate | None:
317
        """Create a Step candidate (without diagram assignment).
318

319
        Diagrams are found at build time, not during scoring, to allow
320
        rotation symbols to claim small images first.
321

322
        Args:
323
            step_candidate: The StepNumber candidate for this step
324
            parts_list_candidate: The PartsList candidate to pair with (or None)
325
            result: Classification result
326

327
        Returns:
328
            The created Candidate with score but no construction
329
        """
330
        ABOVE_EPS = 2.0  # Small epsilon for "above" check
1✔
331
        ALIGNMENT_THRESHOLD_MULTIPLIER = 1.0  # Max horizontal offset
1✔
332
        DISTANCE_THRESHOLD_MULTIPLIER = 1.0  # Max vertical distance
1✔
333

334
        step_bbox = step_candidate.bbox
1✔
335
        parts_list_bbox = parts_list_candidate.bbox if parts_list_candidate else None
1✔
336

337
        # Calculate pairing scores if there's a parts_list above the step
338
        proximity_score = 0.0
1✔
339
        alignment_score = 0.0
1✔
340

341
        if (
1✔
342
            parts_list_bbox is not None
343
            and parts_list_bbox.y1 <= step_bbox.y0 + ABOVE_EPS
344
        ):
345
            # Calculate distance (how far apart vertically)
346
            distance = step_bbox.y0 - parts_list_bbox.y1
1✔
347

348
            # Calculate proximity score
349
            max_distance = step_bbox.height * DISTANCE_THRESHOLD_MULTIPLIER
1✔
350
            if max_distance > 0:
1✔
351
                proximity_score = max(0.0, 1.0 - (distance / max_distance))
1✔
352

353
            # Calculate alignment score (how well left edges align)
354
            max_alignment_diff = step_bbox.width * ALIGNMENT_THRESHOLD_MULTIPLIER
1✔
355
            left_diff = abs(parts_list_bbox.x0 - step_bbox.x0)
1✔
356
            if max_alignment_diff > 0:
1✔
357
                alignment_score = max(0.0, 1.0 - (left_diff / max_alignment_diff))
1✔
358

359
        # Create score object with candidate references
360
        # Diagrams are found at build time, not during scoring
361
        score = _StepScore(
1✔
362
            step_number_candidate=step_candidate,
363
            parts_list_candidate=parts_list_candidate,
364
            has_parts_list=parts_list_candidate is not None,
365
            step_proximity_score=proximity_score,
366
            step_alignment_score=alignment_score,
367
        )
368

369
        # Calculate combined bbox for the candidate (without diagram)
370
        combined_bbox = step_bbox
1✔
371
        if parts_list_bbox:
1✔
372
            combined_bbox = BBox.union(combined_bbox, parts_list_bbox)
1✔
373

374
        # Create candidate
375
        return Candidate(
1✔
376
            bbox=combined_bbox,
377
            label="step",
378
            score=score.overall_score(),
379
            score_details=score,
380
            source_blocks=[],
381
        )
382

383
    def _score_step_diagram_pair(
1✔
384
        self,
385
        step_bbox: BBox,
386
        diagram_bbox: BBox,
387
    ) -> float:
388
        """Score how well a diagram matches a step.
389

390
        Diagrams are typically positioned to the right of and/or below the step
391
        number. This method scores based on:
392
        - Horizontal position: prefer diagrams to the right, penalize left
393
        - Vertical position: prefer diagrams below the step header
394
        - Distance: closer is better
395

396
        Args:
397
            step_bbox: The step number bounding box
398
            diagram_bbox: The diagram bounding box to score
399

400
        Returns:
401
            Score between 0.0 and 1.0 (higher is better match)
402
        """
403
        # Reference point: bottom-right of step number
404
        ref_x = step_bbox.x1
1✔
405
        ref_y = step_bbox.y1
1✔
406

407
        # TODO Move all these constants into config, or make them adaptive?
408

409
        # Horizontal score
410
        # Diagrams to the right are preferred, but allow some overlap
411
        x_offset = diagram_bbox.x0 - ref_x
1✔
412

413
        if x_offset >= -50:
1✔
414
            # Diagram starts to the right or slightly overlapping - good
415
            # Score decreases slightly with distance to the right
416
            x_score = max(0.5, 1.0 - abs(x_offset) / 400.0)
1✔
417
        elif x_offset >= -200:
1✔
418
            # Diagram is moderately to the left - acceptable
419
            x_score = 0.3 + 0.2 * (1.0 + x_offset / 200.0)
1✔
420
        else:
421
            # Diagram is far to the left - poor match
422
            x_score = max(0.1, 0.3 + x_offset / 400.0)
1✔
423

424
        # Vertical score
425
        # Diagrams below the step header are preferred
426
        y_offset = diagram_bbox.y0 - ref_y
1✔
427

428
        if y_offset >= -30:
1✔
429
            # Diagram starts below or slightly overlapping - good
430
            # Score decreases with vertical distance
431
            y_score = max(0.3, 1.0 - abs(y_offset) / 300.0)
1✔
432
        elif y_offset >= -100:
1✔
433
            # Diagram is moderately above - less good but acceptable
434
            y_score = 0.2 + 0.3 * (1.0 + y_offset / 100.0)
1✔
435
        else:
436
            # Diagram is far above - poor match
437
            y_score = max(0.05, 0.2 + y_offset / 300.0)
1✔
438

439
        # Combined score - weight both dimensions equally
440
        score = 0.5 * x_score + 0.5 * y_score
1✔
441

442
        return score
1✔
443

444
    def _get_rotation_symbol_for_step(
1✔
445
        self,
446
        step_bbox: BBox,
447
        result: ClassificationResult,
448
    ) -> RotationSymbol | None:
449
        """Find an already-built rotation symbol within this step's area.
450

451
        Rotation symbols are built by PageClassifier before steps are processed.
452
        This method finds the already-built rotation symbol that falls within
453
        the step's bounding box.
454

455
        Args:
456
            step_bbox: The step's bounding box (including step_num, parts_list,
457
                and diagram)
458
            result: Classification result containing rotation symbol candidates
459

460
        Returns:
461
            Single RotationSymbol element for this step, or None if not found
462
        """
463
        # Get rotation_symbol candidates - they should already be built
464
        # by PageClassifier. Use valid_only=True to only get successfully
465
        # constructed rotation symbols.
UNCOV
466
        rotation_symbol_candidates = result.get_scored_candidates(
×
467
            "rotation_symbol", valid_only=True
468
        )
469

UNCOV
470
        log.debug(
×
471
            "[step] Looking for rotation symbols in step bbox %s, "
472
            "found %d built candidates",
473
            step_bbox,
474
            len(rotation_symbol_candidates),
475
        )
476

UNCOV
477
        if not rotation_symbol_candidates:
×
UNCOV
478
            return None
×
479

480
        # Find rotation symbols within or overlapping the step's bbox
481
        # Keep track of best candidate by score
UNCOV
482
        best_candidate = None
×
UNCOV
483
        best_score = 0.0
×
UNCOV
484
        for candidate in rotation_symbol_candidates:
×
NEW
485
            overlaps = candidate.bbox.overlaps(step_bbox)
×
UNCOV
486
            log.debug(
×
487
                "[step]   Candidate at %s, overlaps=%s, score=%.2f, constructed=%s",
488
                candidate.bbox,
489
                overlaps,
490
                candidate.score,
491
                candidate.constructed is not None,
492
            )
UNCOV
493
            if overlaps and candidate.score > best_score:
×
UNCOV
494
                best_candidate = candidate
×
UNCOV
495
                best_score = candidate.score
×
496

NEW
497
        if best_candidate and best_candidate.constructed is not None:
×
NEW
498
            rotation_symbol = best_candidate.constructed
×
UNCOV
499
            assert isinstance(rotation_symbol, RotationSymbol)
×
UNCOV
500
            log.debug(
×
501
                "[step] Found rotation symbol at %s (score=%.2f)",
502
                rotation_symbol.bbox,
503
                best_score,
504
            )
UNCOV
505
            return rotation_symbol
×
506

NEW
507
        log.debug("[step] No rotation symbol found in step bbox %s", step_bbox)
×
UNCOV
508
        return None
×
509

510
    def _get_arrows_for_step(
1✔
511
        self,
512
        step_num: StepNumber,
513
        diagram: Diagram | None,
514
        result: ClassificationResult,
515
    ) -> list[Arrow]:
516
        """Find arrows associated with this step.
517

518
        Looks for arrow candidates that are positioned near the step's diagram
519
        or step number. Typically these are arrows pointing from subassembly
520
        callout boxes to the main diagram.
521

522
        Args:
523
            step_num: The step number element
524
            diagram: The diagram element (if any)
525
            result: Classification result containing arrow candidates
526

527
        Returns:
528
            List of Arrow elements for this step
529
        """
530
        arrow_candidates = result.get_scored_candidates(
1✔
531
            "arrow", valid_only=False, exclude_failed=True
532
        )
533

534
        log.debug(
1✔
535
            "[step] Looking for arrows for step %d, found %d candidates",
536
            step_num.value,
537
            len(arrow_candidates),
538
        )
539

540
        if not arrow_candidates:
1✔
541
            return []
1✔
542

543
        # Determine search region: prefer diagram area, fallback to step area
544
        search_bbox = diagram.bbox if diagram else step_num.bbox
1✔
545

546
        # Expand search region to catch arrows near the diagram
547
        # Use a larger margin than rotation symbols since arrows can extend further
548
        search_region = BBox(
1✔
549
            x0=search_bbox.x0 - 100,
550
            y0=search_bbox.y0 - 100,
551
            x1=search_bbox.x1 + 100,
552
            y1=search_bbox.y1 + 100,
553
        )
554

555
        log.debug(
1✔
556
            "[step] Arrow search region for step %d: %s",
557
            step_num.value,
558
            search_region,
559
        )
560

561
        # Find arrows within or overlapping the search region
562
        arrows: list[Arrow] = []
1✔
563
        for candidate in arrow_candidates:
1✔
564
            overlaps = candidate.bbox.overlaps(search_region)
1✔
565
            log.debug(
1✔
566
                "[step]   Arrow candidate at %s, overlaps=%s, score=%.2f",
567
                candidate.bbox,
568
                overlaps,
569
                candidate.score,
570
            )
571
            if overlaps:
1✔
572
                try:
1✔
573
                    arrow = result.build(candidate)
1✔
574
                    assert isinstance(arrow, Arrow)
1✔
575
                    arrows.append(arrow)
1✔
576
                except CandidateFailedError:
1✔
577
                    # Arrow lost conflict to another arrow (they share source blocks)
578
                    # This is expected when multiple arrows overlap - skip it
579
                    log.debug(
1✔
580
                        "[step]   Arrow candidate at %s failed (conflict), skipping",
581
                        candidate.bbox,
582
                    )
583
                    continue
1✔
584

585
        log.debug(
1✔
586
            "[step] Found %d arrows for step %d",
587
            len(arrows),
588
            step_num.value,
589
        )
590
        return arrows
1✔
591

592
    def _get_subassemblies_for_step(
1✔
593
        self,
594
        step_num: StepNumber,
595
        diagram: Diagram | None,
596
        result: ClassificationResult,
597
    ) -> list[SubAssembly]:
598
        """Find subassemblies associated with this step.
599

600
        Looks for subassembly candidates that are positioned near the step's
601
        diagram or step number. SubAssemblies are callout boxes showing
602
        sub-assemblies.
603

604
        Args:
605
            step_num: The step number element
606
            diagram: The diagram element (if any)
607
            result: Classification result containing subassembly candidates
608

609
        Returns:
610
            List of SubAssembly elements for this step
611
        """
612
        subassembly_candidates = result.get_scored_candidates(
1✔
613
            "subassembly", valid_only=False, exclude_failed=True
614
        )
615

616
        log.debug(
1✔
617
            "[step] Looking for subassemblies for step %d, found %d candidates",
618
            step_num.value,
619
            len(subassembly_candidates),
620
        )
621

622
        if not subassembly_candidates:
1✔
623
            return []
1✔
624

625
        # Determine search region: prefer diagram area, fallback to step area
626
        search_bbox = diagram.bbox if diagram else step_num.bbox
1✔
627

628
        # Expand search region to catch subassemblies near the diagram
629
        # Use a larger margin since subassemblies can be positioned further from
630
        # the main diagram
631
        search_region = BBox(
1✔
632
            x0=search_bbox.x0 - 150,
633
            y0=search_bbox.y0 - 150,
634
            x1=search_bbox.x1 + 150,
635
            y1=search_bbox.y1 + 150,
636
        )
637

638
        log.debug(
1✔
639
            "[step] SubAssembly search region for step %d: %s",
640
            step_num.value,
641
            search_region,
642
        )
643

644
        # Find subassemblies within or overlapping the search region
645
        subassemblies: list[SubAssembly] = []
1✔
646

647
        for candidate in subassembly_candidates:
1✔
648
            overlaps = candidate.bbox.overlaps(search_region)
1✔
649
            log.debug(
1✔
650
                "[step]   SubAssembly candidate at %s, overlaps=%s, score=%.2f",
651
                candidate.bbox,
652
                overlaps,
653
                candidate.score,
654
            )
655
            if overlaps:
1✔
656
                try:
1✔
657
                    subassembly = result.build(candidate)
1✔
658
                    assert isinstance(subassembly, SubAssembly)
1✔
659
                    subassemblies.append(subassembly)
1✔
660
                except Exception as e:
1✔
661
                    log.debug(
1✔
662
                        "[step]   Failed to build subassembly at %s: %s",
663
                        candidate.bbox,
664
                        e,
665
                    )
666

667
        log.debug(
1✔
668
            "[step] Found %d subassemblies for step %d",
669
            len(subassemblies),
670
            step_num.value,
671
        )
672
        return subassemblies
1✔
673

674
    def _compute_step_bbox(
1✔
675
        self,
676
        step_num: StepNumber,
677
        parts_list: PartsList | None,
678
        diagram: Diagram | None,
679
        page_bbox: BBox,
680
    ) -> BBox:
681
        """Compute the overall bounding box for the Step.
682

683
        This encompasses the step number, parts list (if any), and diagram (if any).
684
        The result is clipped to the page bounds to handle elements that extend
685
        slightly off-page (e.g., arrows in diagrams).
686

687
        Args:
688
            step_num: The step number element
689
            parts_list: The parts list (if any)
690
            diagram: The diagram element (if any)
691
            page_bbox: The page bounding box to clip to
692

693
        Returns:
694
            Combined bounding box, clipped to page bounds
695
        """
696
        bboxes = [step_num.bbox]
1✔
697
        if parts_list:
1✔
698
            bboxes.append(parts_list.bbox)
1✔
699
        if diagram:
1✔
700
            bboxes.append(diagram.bbox)
1✔
701

702
        return BBox.union_all(bboxes).clip_to(page_bbox)
1✔
703

704
    def _deduplicate_and_assign_diagrams(
1✔
705
        self, candidates: list[Candidate], result: ClassificationResult
706
    ) -> list[Candidate]:
707
        """Select the best Step candidates, ensuring each step number is unique.
708

709
        Diagrams are found at build time, not during scoring, to allow
710
        rotation symbols to claim small images first.
711

712
        Args:
713
            candidates: All possible Step candidates
714
            result: Classification result (unused, kept for API compatibility)
715

716
        Returns:
717
            Deduplicated list of Step candidates (one per step number value)
718
        """
719
        # First, deduplicate candidates by step number value
720
        # Pick the best candidate for each unique step number
721
        best_by_step_value: dict[int, Candidate] = {}
1✔
722

723
        for candidate in candidates:
1✔
724
            assert isinstance(candidate.score_details, _StepScore)
1✔
725
            score = candidate.score_details
1✔
726

727
            # Extract step number value
728
            step_num_candidate = score.step_number_candidate
1✔
729
            if not step_num_candidate.source_blocks:
1✔
730
                continue
×
731
            text_block = step_num_candidate.source_blocks[0]
1✔
732
            if not isinstance(text_block, Text):
1✔
733
                continue
×
734

735
            step_value = extract_step_number_value(text_block.text)
1✔
736
            if step_value is None:
1✔
737
                continue
×
738

739
            # Keep the best candidate for each step value
740
            if step_value not in best_by_step_value:
1✔
741
                best_by_step_value[step_value] = candidate
1✔
742
            else:
743
                existing = best_by_step_value[step_value]
1✔
744
                if candidate.score > existing.score:
1✔
745
                    best_by_step_value[step_value] = candidate
1✔
746

747
        # Get unique step candidates
748
        unique_step_candidates = list(best_by_step_value.values())
1✔
749

750
        if not unique_step_candidates:
1✔
751
            return []
×
752

753
        # Build final candidates ensuring parts list uniqueness
754
        selected: list[Candidate] = []
1✔
755
        used_parts_list_ids: set[int] = set()
1✔
756

757
        for candidate in unique_step_candidates:
1✔
758
            assert isinstance(candidate.score_details, _StepScore)
1✔
759
            score = candidate.score_details
1✔
760

761
            # Check parts list uniqueness
762
            parts_list_candidate = score.parts_list_candidate
1✔
763
            if parts_list_candidate is not None:
1✔
764
                has_parts = False
1✔
765
                if isinstance(parts_list_candidate.score_details, _PartsListScore):
1✔
766
                    has_parts = (
1✔
767
                        len(parts_list_candidate.score_details.part_candidates) > 0
768
                    )
769

770
                if has_parts:
1✔
771
                    parts_list_id = id(parts_list_candidate)
1✔
772
                    if parts_list_id in used_parts_list_ids:
1✔
773
                        # Use None for parts list if already used
774
                        parts_list_candidate = None
1✔
775
                    else:
776
                        used_parts_list_ids.add(parts_list_id)
1✔
777

778
            # Create updated score if parts_list changed
779
            if parts_list_candidate != score.parts_list_candidate:
1✔
780
                updated_score = _StepScore(
1✔
781
                    step_number_candidate=score.step_number_candidate,
782
                    parts_list_candidate=parts_list_candidate,
783
                    has_parts_list=parts_list_candidate is not None,
784
                    step_proximity_score=score.step_proximity_score,
785
                    step_alignment_score=score.step_alignment_score,
786
                )
787
                candidate = Candidate(
1✔
788
                    bbox=candidate.bbox,
789
                    label=candidate.label,
790
                    score=updated_score.overall_score(),
791
                    score_details=updated_score,
792
                    source_blocks=candidate.source_blocks,
793
                )
794

795
            selected.append(candidate)
1✔
796

797
            # Log selection
798
            text_block = score.step_number_candidate.source_blocks[0]
1✔
799
            assert isinstance(text_block, Text)
1✔
800
            step_value = extract_step_number_value(text_block.text)
1✔
801
            log.debug(
1✔
802
                "[step] Selected step %d (parts_list=%s, score=%.2f)",
803
                step_value or 0,
804
                "yes" if parts_list_candidate is not None else "no",
805
                candidate.score,
806
            )
807

808
        return selected
1✔
809

810

811
def assign_rotation_symbols_to_steps(
1✔
812
    steps: list[Step],
813
    rotation_symbols: list[RotationSymbol],
814
    max_distance: float = 300.0,
815
) -> None:
816
    """Assign rotation symbols to steps using Hungarian algorithm.
817

818
    Uses optimal bipartite matching to pair rotation symbols with steps based on
819
    minimum distance from the rotation symbol to the step's diagram (or step bbox
820
    center if no diagram).
821

822
    This function mutates the Step objects in place, setting their rotation_symbol
823
    attribute.
824

825
    Args:
826
        steps: List of Step objects to assign rotation symbols to
827
        rotation_symbols: List of RotationSymbol objects to assign
828
        max_distance: Maximum distance for a valid assignment. Pairs with distance
829
            greater than this will not be matched.
830
    """
831
    if not steps or not rotation_symbols:
1✔
832
        log.debug(
1✔
833
            "[step] No rotation symbol assignment needed: "
834
            "steps=%d, rotation_symbols=%d",
835
            len(steps),
836
            len(rotation_symbols),
837
        )
838
        return
1✔
839

840
    n_steps = len(steps)
1✔
841
    n_symbols = len(rotation_symbols)
1✔
842

843
    log.debug(
1✔
844
        "[step] Running Hungarian matching: %d steps, %d rotation symbols",
845
        n_steps,
846
        n_symbols,
847
    )
848

849
    # Build cost matrix: rows = rotation symbols, cols = steps
850
    # Cost = distance from rotation symbol center to step's diagram center
851
    # (or step bbox center if no diagram)
852
    cost_matrix = np.zeros((n_symbols, n_steps))
1✔
853

854
    for i, rs in enumerate(rotation_symbols):
1✔
855
        rs_center = rs.bbox.center
1✔
856
        for j, step in enumerate(steps):
1✔
857
            # Use diagram center if available, otherwise step bbox center
858
            if step.diagram:
1✔
859
                target_center = step.diagram.bbox.center
1✔
860
            else:
NEW
861
                target_center = step.bbox.center
×
862

863
            # Euclidean distance
864
            distance = (
1✔
865
                (rs_center[0] - target_center[0]) ** 2
866
                + (rs_center[1] - target_center[1]) ** 2
867
            ) ** 0.5
868

869
            cost_matrix[i, j] = distance
1✔
870
            log.debug(
1✔
871
                "[step]   Distance from rotation_symbol[%d] at %s to step[%d]: %.1f",
872
                i,
873
                rs.bbox,
874
                step.step_number.value,
875
                distance,
876
            )
877

878
    # Apply max_distance threshold by setting costs above threshold to a high value
879
    # This prevents assignments that are too far apart
880
    high_cost = max_distance * 10  # Arbitrary large value
1✔
881
    cost_matrix_thresholded = np.where(
1✔
882
        cost_matrix > max_distance, high_cost, cost_matrix
883
    )
884

885
    # Run Hungarian algorithm
886
    row_indices, col_indices = linear_sum_assignment(cost_matrix_thresholded)
1✔
887

888
    # Assign rotation symbols to steps based on the matching
889
    for row_idx, col_idx in zip(row_indices, col_indices, strict=True):
1✔
890
        # Check if this assignment is within the max_distance threshold
891
        if cost_matrix[row_idx, col_idx] <= max_distance:
1✔
892
            rs = rotation_symbols[row_idx]
1✔
893
            step = steps[col_idx]
1✔
894
            step.rotation_symbol = rs
1✔
895
            # Expand step bbox to include the rotation symbol
896
            step.bbox = step.bbox.union(rs.bbox)
1✔
897
            log.debug(
1✔
898
                "[step] Assigned rotation symbol at %s to step %d "
899
                "(distance=%.1f, new step bbox=%s)",
900
                rs.bbox,
901
                step.step_number.value,
902
                cost_matrix[row_idx, col_idx],
903
                step.bbox,
904
            )
905
        else:
NEW
906
            log.debug(
×
907
                "[step] Skipped assignment: rotation_symbol[%d] to step[%d] "
908
                "distance=%.1f > max_distance=%.1f",
909
                row_idx,
910
                col_indices[row_idx] if row_idx < len(col_indices) else -1,
911
                cost_matrix[row_idx, col_idx],
912
                max_distance,
913
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc