• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

simonsobs / so_campaign_manager / 18854987527

27 Oct 2025 08:28PM UTC coverage: 80.311% (+0.2%) from 80.141%
18854987527

Pull #112

github

web-flow
Merge efeaa0bed into 3ef0e3272
Pull Request #112: fix: multipass entries

213 of 293 branches covered (72.7%)

Branch coverage included in aggregate %.

42 of 55 new or added lines in 15 files covered. (76.36%)

2 existing lines in 2 files now uncovered.

2418 of 2983 relevant lines covered (81.06%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.26
/src/socm/workflows/ml_mapmaking.py
1
from functools import lru_cache
1✔
2
from pathlib import Path
1✔
3
from typing import Any, List, Optional, Union
1✔
4

5
from sotodlib.core import Context
1✔
6

7
from socm.core import Workflow
1✔
8
from socm.utils.misc import get_query_from_file
1✔
9

10

11
@lru_cache(maxsize=10)
1✔
12
def _load_context(ctx_path: str) -> Context:
1✔
13
    return Context(Path(ctx_path))
1✔
14

15
class MLMapmakingWorkflow(Workflow):
1✔
16
    """
17
    A workflow for ML mapmaking.
18
    """
19

20
    area: str
1✔
21
    output_dir: str
1✔
22
    preprocess_config: str
1✔
23
    query: str = "1"
1✔
24
    name: str = "ml_mapmaking_workflow"
1✔
25
    executable: str = "so-site-pipeline"
1✔
26
    subcommand: str = "make-ml-map"
1✔
27
    datasize: int = 0
1✔
28
    comps: Optional[str] = "TQU"
1✔
29
    wafers: Optional[str] = None
1✔
30
    bands: Optional[str] = None
1✔
31
    nmat: Optional[str] = "corr"
1✔
32
    max_dets: Optional[int] = None
1✔
33
    site: Optional[str] = "so_lat"
1✔
34
    downsample: Union[int, List[int]] = 1
1✔
35
    maxiter: Union[int, List[int]] = 500
1✔
36
    tiled: int = 1
1✔
37

38
    def model_post_init(self, __context: Any) -> None:
1✔
39
        """
40
        Post-initialization to set the context for the workflow.
41
        """
42
        ctx_file = Path(self.context.split("file://")[-1]).absolute()
1✔
43
        ctx = _load_context(str(ctx_file))
1✔
44

45
        final_query = self.query
1✔
46
        if self.query.startswith("file://"):
1✔
47
            query_path = Path(self.query.split("file://")[-1]).absolute()
×
48
            final_query = get_query_from_file(query_path)
×
49
        obs_ids = ctx.obsdb.query(final_query)
1✔
50
        for obs_id in obs_ids:
1✔
51
            self.datasize += obs_id["n_samples"]
1✔
52

53
    def get_command(self) -> str:
1✔
54
        """
55
        Get the command to run the ML mapmaking workflow.
56
        """
57
        command = f"srun --cpu_bind=cores --export=ALL --ntasks-per-node={self.resources['ranks']} --cpus-per-task={self.resources['threads']} {self.executable} {self.subcommand} "
1✔
58
        command += " ".join(self.get_arguments())
1✔
59

60
        return command.strip()
1✔
61

62
    def get_arguments(self) -> List[str]:
1✔
63
        """
64
        Get the command to run the ML mapmaking workflow.
65
        """
66
        area = Path(self.area.split("file://")[-1])
1✔
67
        final_query = self.query
1✔
68
        if self.query.startswith("file://"):
1✔
69
            final_query = Path(self.query.split("file://")[-1]).absolute()
×
70
            final_query = f"{final_query.absolute()}"
×
71
        preprocess_config = Path(self.preprocess_config.split("file://")[-1])
1✔
72

73
        arguments = [final_query, f"{area.absolute()}", self.output_dir, f"{preprocess_config.absolute()}"]
1✔
74
        sorted_workflow = dict(sorted(self.model_dump(exclude_unset=True).items()))
1✔
75

76
        for k, v in sorted_workflow.items():
1✔
77
            if isinstance(v, str) and v.startswith("file://"):
1✔
78
                v = Path(v.split("file://")[-1]).absolute()
×
79
            elif isinstance(v, list):
1✔
NEW
80
                v = ",".join([str(item) for item in v])
×
81
            if k not in [
1✔
82
                "area",
83
                "output_dir",
84
                "executable",
85
                "query",
86
                "output_dir",
87
                "id",
88
                "environment",
89
                "resources",
90
                "datasize",
91
                "preprocess_config"
92
            ]:
93
                arguments.append(f"--{k}={v}")
1✔
94
        return arguments
1✔
95

96
    @classmethod
1✔
97
    def get_workflows(
1✔
98
        cls, descriptions: Union[List[dict], dict]
99
    ) -> List["MLMapmakingWorkflow"]:
100
        """
101
        Create a list of MLMapmakingWorkflow instances from the provided descriptions.
102
        """
103
        if isinstance(descriptions, dict):
×
104
            descriptions = [descriptions]
×
105

106
        workflows = []
×
107
        for desc in descriptions:
×
108
            workflow = cls(**desc)
×
109
            workflows.append(workflow)
×
110

111
        return workflows
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc