• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

simonsobs / so_campaign_manager / 16810273523

07 Aug 2025 04:32PM UTC coverage: 74.718% (+1.5%) from 73.244%
16810273523

Pull #67

github

web-flow
Merge 797495e3d into 4aff68c8a
Pull Request #67: feat: far close sun/moon

173 of 246 branches covered (70.33%)

Branch coverage included in aggregate %.

131 of 139 new or added lines in 3 files covered. (94.24%)

1618 of 2151 relevant lines covered (75.22%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.0
/src/socm/workflows/ml_null_tests/moon_close_null_test.py
1
from datetime import datetime, timedelta, timezone
1✔
2
from typing import Dict, List, Optional, Union
1✔
3

4
import astropy.units as u
1✔
5
import numpy as np
1✔
6
from astral import LocationInfo
1✔
7
from astropy.coordinates import AltAz, EarthLocation, SkyCoord, get_body
1✔
8
from astropy.time import Time
1✔
9
from pydantic import PrivateAttr
1✔
10
from sotodlib.core import Context
1✔
11

12
from socm.workflows.ml_null_tests import NullTestWorkflow
1✔
13

14

15
class MoonCloseFarNullTestWorkflow(NullTestWorkflow):
1✔
16
    """
17
    A workflow for day/night null tests.
18

19
    This workflow splits observations based on whether they were taken during the day or night.
20
    It creates time-interleaved splits with nsplits=2 as specified.
21
    """
22

23
    chunk_nobs: Optional[int] = None
1✔
24
    chunk_duration: Optional[timedelta] = None
1✔
25
    nsplits: int = 2  # Fixed to 2 as specified in the issue
1✔
26
    name: str = "moon_close_far_null_test_workflow"
1✔
27
    sun_distance_threshold: float = (
1✔
28
        10.0  # in degrees, threshold for close/far from the Sun
29
    )
30

31
    _field_view_radius_per_telescope: Dict[str, float] = PrivateAttr(
1✔
32
        {"sat": 1.0, "act": 1.0, "lat": 1.0}
33
    )
34

35
    def _get_splits(
1✔
36
        self, ctx: Context, obs_info: Dict[str, Dict[str, Union[float, str]]]
37
    ) -> Dict[str, List[List[str]]]:
38
        """
39
        Distribute the observations across splits based on day/night.
40

41
        Groups observations by whether they were taken during the day or night and then
42
        creates time-interleaved splits for each with nsplits=2.
43

44
        Args:
45
            ctx: Context object
46
            obs_info: Dictionary mapping obs_id to observation metadata
47

48
        Returns:
49
            Dict mapping 'day' and 'night' to list of splits, where each split is a list
50
            Dict mapping 'close' and 'far' to list of splits, where each split is a list
51
            of obs_ids.
52
        """
53
        if self.chunk_nobs is None and self.chunk_duration is None:
1✔
NEW
54
            raise ValueError("Either chunk_nobs or duration must be set.")
×
55
        elif self.chunk_nobs is not None and self.chunk_duration is not None:
1✔
NEW
56
            raise ValueError("Only one of chunk_nobs or duration can be set.")
×
57
        elif self.chunk_nobs is None:
1✔
58
            # Decide the chunk size based on the duration
NEW
59
            raise NotImplementedError(
×
60
                "Splitting by duration is not implemented yet. Please set chunk_nobs."
61
            )
62

63
        # Group observations by day/night
64
        moon_position_splits = {"close": [], "far": []}
1✔
65
        for obs_id, obs_meta in obs_info.items():
1✔
66
            obs_time = datetime.fromtimestamp(
1✔
67
                timestamp=obs_meta["start_time"], tz=timezone.utc
68
            )  # Assuming time is in ISO format
69
            obs_time = Time(obs_meta["start_time"], format="unix", scale="utc")
1✔
70

71
            city = LocationInfo(
1✔
72
                "San Pedro de Atacama", "Chile", "America/Santiago", -22.91, -68.2
73
            )
74
            alt = 5190  # Altitude in meters
1✔
75
            location = EarthLocation(
1✔
76
                lat=city.latitude * u.deg, lon=city.longitude * u.deg, height=alt * u.m
77
            )
78
            altaz = AltAz(obstime=obs_time, location=location)
1✔
79
            altaz_coord = SkyCoord(
1✔
80
                az=obs_meta["az_center"] * u.deg,
81
                alt=obs_meta["el_center"] * u.deg,
82
                frame=altaz,
83
            )
84
            radec = altaz_coord.transform_to("icrs")
1✔
85

86
            # Get Moon's position
87
            moon = get_body("moon", obs_time, location=location)
1✔
88

89
            # Compute angular separation
90
            separation = radec.separation(moon)
1✔
91

92
            if separation.deg <= (
1✔
93
                self.sun_distance_threshold
94
                + self._field_view_radius_per_telescope[self.site]
95
            ):
NEW
96
                moon_position_splits["close"].append(obs_id)
×
97
            else:
98
                moon_position_splits["far"].append(obs_id)
1✔
99

100
        final_splits = {}
1✔
101

102
        # For each direction, create time-interleaved splits
103
        for moon_position, obs_infos in moon_position_splits.items():
1✔
104
            if not obs_infos:
1✔
105
                continue
1✔
106

107
            # Sort by timestamp for time-based splitting
108
            sorted_ids = sorted(obs_infos, key=lambda k: obs_info[k]["start_time"])
1✔
109

110
            # Group in chunks based on chunk_nobs
111
            obs_lists = np.array_split(sorted_ids, self.chunk_nobs)
1✔
112

113
            # Create nsplits (=2) time-interleaved splits
114
            splits = [[] for _ in range(self.nsplits)]
1✔
115
            for i, obs_list in enumerate(obs_lists):
1✔
116
                splits[i % self.nsplits] += obs_list.tolist()
1✔
117

118
            final_splits[moon_position] = splits
1✔
119

120
        return final_splits
1✔
121

122
    @classmethod
1✔
123
    def get_workflows(cls, desc=None) -> List[NullTestWorkflow]:
1✔
124
        """
125
        Create a list of NullTestWorkflows instances from the provided descriptions.
126

127
        Creates separate workflows for each direction split following the naming
128
        convention: {setname} = direction_[rising,setting,middle]
129
        """
130
        moon_position_workflow = cls(**desc)
1✔
131

132
        workflows = []
1✔
133
        for (
1✔
134
            moon_position,
135
            moon_position_splits,
136
        ) in moon_position_workflow._splits.items():
137
            for split_idx, split in enumerate(moon_position_splits):
1✔
138
                desc_copy = moon_position_workflow.model_dump(exclude_unset=True)
1✔
139
                desc_copy["name"] = (
1✔
140
                    f"moon_{moon_position}_split_{split_idx + 1}_null_test_workflow"
141
                )
142
                desc_copy["datasize"] = 0
1✔
143
                desc_copy["query"] = "obs_id IN ("
1✔
144
                for oid in split:
1✔
145
                    desc_copy["query"] += f"'{oid}',"
1✔
146
                desc_copy["query"] = desc_copy["query"].rstrip(",")
1✔
147
                desc_copy["query"] += ")"
1✔
148
                desc_copy["chunk_nobs"] = 1
1✔
149

150
                # Follow the naming convention: direction_[rising,setting,middle]
151
                desc_copy["output_dir"] = (
1✔
152
                    f"{moon_position_workflow.output_dir}/moon_{moon_position}_split_{split_idx + 1}"
153
                )
154

155
                workflow = NullTestWorkflow(**desc_copy)
1✔
156
                workflows.append(workflow)
1✔
157

158
        return workflows
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc