• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

simonsobs / so_campaign_manager / 18854987527

27 Oct 2025 08:28PM UTC coverage: 80.311% (+0.2%) from 80.141%
18854987527

Pull #112

github

web-flow
Merge efeaa0bed into 3ef0e3272
Pull Request #112: fix: multipass entries

213 of 293 branches covered (72.7%)

Branch coverage included in aggregate %.

42 of 55 new or added lines in 15 files covered. (76.36%)

2 existing lines in 2 files now uncovered.

2418 of 2983 relevant lines covered (81.06%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.16
/src/socm/workflows/ml_null_tests/base.py
1
from datetime import timedelta
1✔
2
from pathlib import Path
1✔
3
from typing import Any, Dict, List, Optional, Union
1✔
4

5
from sotodlib.core import Context
1✔
6

7
from socm.utils.misc import get_query_from_file
1✔
8
from socm.workflows import MLMapmakingWorkflow
1✔
9

10

11
class NullTestWorkflow(MLMapmakingWorkflow):
1✔
12
    """
13
    A workflow for null tests.
14
    """
15

16
    area: str
1✔
17
    output_dir: str
1✔
18
    query: str = "1"
1✔
19
    name: str = "lat_null_test_workflow"
1✔
20
    datasize: int = 0
1✔
21
    chunk_nobs: Optional[int] = None
1✔
22
    chunk_duration: Optional[timedelta] = None
1✔
23

24
    def model_post_init(self, __context: Any) -> None:
1✔
25
        """
26
        Post-initialization to set the context for the workflow and distribute the
27
        observations across splits.
28
        """
29
        ctx_file = Path(self.context.split("file://")[-1]).absolute()
1✔
30
        ctx = Context(ctx_file)
1✔
31
        final_query = self.query
1✔
32
        if self.query.startswith("file://"):
1✔
33
            query_path = Path(self.query.split("file://")[-1]).absolute()
1✔
34
            final_query = get_query_from_file(query_path)
1✔
35
        obs_ids = ctx.obsdb.query(final_query)
1✔
36
        obs_info = dict()
1✔
37
        for obs_id in obs_ids:
1✔
38
            self.datasize += obs_id["n_samples"]
1✔
39
            obs_info[obs_id["obs_id"]] = {
1✔
40
                "start_time": obs_id["timestamp"],
41
                "wafer_list": obs_id["wafer_slots_list"].split(","),
42
                "tube_slot": obs_id.get("tube_slot", "st1"),
43
                "az_center": obs_id["az_center"],
44
                "el_center": obs_id["el_center"],
45
                "pwv": obs_id.get("pwv", 0),
46
            }
47
        # Ensure obs_ids are sorted by their timestamp
48
        # Order the obs_ids based on their timestamp it is in the obs_meta.obs_info.timestamp
49

50
        self._splits = self._get_splits(ctx, obs_info)
1✔
51

52
    def _get_num_chunks(self, num_obs: int) -> int:
1✔
53
        num_chunks = (
1✔
54
            num_obs + self.chunk_nobs - 1
55
        ) // self.chunk_nobs  # Ceiling division
56
        return num_chunks
1✔
57

58
    def _get_splits(
1✔
59
        self, ctx: Context, obs_info: Dict[str, Dict[str, Union[float, str]]]
60
    ) -> List[List[str]]:
61
        """
62
        Distribute the observations across splits based on the context and observation IDs.
63
        """
64
        if self.__class__.__name__ != "NullTestWorkflow":
1✔
65
            raise NotImplementedError(
×
66
                "This method should be implemented in subclasses."
67
            )
68
        else:
69
            pass
1✔
70

71
    @classmethod
1✔
72
    def get_workflows(cls, desc: Dict[str, Any]) -> List["NullTestWorkflow"]:
1✔
73
        """
74
        Distribute the observations across splits based on the context and observation IDs.
75
        """
NEW
76
        if cls.__name__ != "NullTestWorkflow":
×
NEW
77
            raise NotImplementedError(
×
78
                "This method should be implemented in subclasses."
79
            )
80
        else:
NEW
81
            pass
×
82

83
    def get_arguments(self) -> List[str]:
1✔
84
        """
85
        Get the command to run the ML mapmaking workflow.
86
        """
87
        area = Path(self.area.split("file://")[-1])
1✔
88
        query = Path(self.query.split("file://")[-1])
1✔
89
        preprocess_config = Path(self.preprocess_config.split("file://")[-1])
1✔
90

91
        arguments = [f"{query.absolute()}", f"{area.absolute()}", self.output_dir, f"{preprocess_config.absolute()}"]
1✔
92
        sorted_workflow = dict(sorted(self.model_dump(exclude_unset=True).items()))
1✔
93

94
        for k, v in sorted_workflow.items():
1✔
95
            if isinstance(v, str) and v.startswith("file://"):
1✔
96
                v = Path(v.split("file://")[-1]).absolute()
1✔
97
            elif isinstance(v, list):
1✔
NEW
98
                v = ",".join([str(item) for item in v])
×
99
            if k not in [
1✔
100
                "area",
101
                "output_dir",
102
                "executable",
103
                "query",
104
                "id",
105
                "environment",
106
                "resources",
107
                "datasize",
108
                "chunk_nobs",
109
                "nsplits",
110
                "wafers",
111
                "subcommand",
112
                "name",
113
                "chunk_duration",
114
                "preprocess_config"
115
            ]:
116
                arguments.append(f"--{k}={v}")
1✔
117
        return arguments
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc