• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

winter-telescope / winterdrp / 3596924942

pending completion
3596924942

Pull #224

github

GitHub
Merge 0ea201d54 into 00fbdf6f7
Pull Request #224: Code with Style

1490 of 1490 new or added lines in 93 files covered. (100.0%)

4571 of 6109 relevant lines covered (74.82%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.86
/winterdrp/processors/base_processor.py
1
import copy
1✔
2
import datetime
1✔
3
import getpass
1✔
4
import hashlib
1✔
5
import logging
1✔
6
import os
1✔
7
import socket
1✔
8
from abc import ABC
1✔
9
from pathlib import Path
1✔
10
from queue import Queue
1✔
11
from threading import Thread
1✔
12

13
import astropy.io.fits
1✔
14
import numpy as np
1✔
15

16
from winterdrp.data import DataBatch, Dataset, Image, ImageBatch, SourceBatch
1✔
17
from winterdrp.errors import (
1✔
18
    ErrorReport,
19
    ErrorStack,
20
    NoncriticalProcessingError,
21
    ProcessorError,
22
)
23
from winterdrp.io import open_fits, save_to_path
1✔
24
from winterdrp.paths import (
1✔
25
    base_name_key,
26
    cal_output_sub_dir,
27
    get_mask_path,
28
    get_output_path,
29
    latest_mask_save_key,
30
    latest_save_key,
31
    max_n_cpu,
32
    package_name,
33
    proc_history_key,
34
    raw_img_key,
35
)
36

37
logger = logging.getLogger(__name__)
1✔
38

39

40
class PrerequisiteError(ProcessorError):
1✔
41
    pass
1✔
42

43

44
class NoCandidatesError(ProcessorError):
1✔
45
    pass
1✔
46

47

48
class BaseDPU:
1✔
49
    def base_apply(self, dataset: Dataset) -> tuple[Dataset, ErrorStack]:
1✔
50
        raise NotImplementedError()
51

52
    def generate_error_report(
1✔
53
        self, exception: Exception, batch: DataBatch
54
    ) -> ErrorReport:
55
        return ErrorReport(exception, self.__module__, batch.get_raw_image_names())
1✔
56

57

58
class BaseProcessor(BaseDPU):
1✔
59
    @property
1✔
60
    def base_key(self):
1✔
61
        raise NotImplementedError
62

63
    max_n_cpu: int = max_n_cpu
1✔
64

65
    subclasses = {}
1✔
66

67
    def __init__(self, *args, **kwargs):
1✔
68

69
        self.night = None
1✔
70
        self.night_sub_dir = None
1✔
71
        self.preceding_steps = None
1✔
72
        self.passed_dataset = self.err_stack = None
1✔
73

74
    @classmethod
1✔
75
    def __init_subclass__(cls, **kwargs):
1✔
76
        super().__init_subclass__(**kwargs)
1✔
77
        cls.subclasses[cls.base_key] = cls
1✔
78

79
    def set_preceding_steps(self, previous_steps: list):
1✔
80
        self.preceding_steps = previous_steps
1✔
81

82
    def set_night(self, night_sub_dir: str | int = ""):
1✔
83
        self.night_sub_dir = night_sub_dir
1✔
84
        self.night = night_sub_dir.split("/")[-1]
1✔
85

86
    @staticmethod
1✔
87
    def update_dataset(dataset: Dataset) -> Dataset:
1✔
88
        return dataset
1✔
89

90
    def check_prerequisites(
1✔
91
        self,
92
    ):
93
        pass
1✔
94

95
    def clean_cache(self):
1✔
96
        self.passed_dataset = self.err_stack = None
1✔
97

98
    def base_apply(self, dataset: Dataset) -> tuple[Dataset, ErrorStack]:
1✔
99

100
        self.passed_dataset = Dataset()
1✔
101
        self.err_stack = ErrorStack()
1✔
102

103
        if len(dataset) > 0:
1✔
104

105
            n_cpu = min([self.max_n_cpu, len(dataset)])
1✔
106

107
            watchdog_queue = Queue()
1✔
108

109
            workers = []
1✔
110

111
            for i in range(n_cpu):
1✔
112
                # Set up a worker thread to process database load
113
                worker = Thread(target=self.apply_to_batch, args=(watchdog_queue,))
1✔
114
                worker.daemon = True
1✔
115
                worker.start()
1✔
116

117
                workers.append(worker)
1✔
118

119
            for i, batch in enumerate(dataset):
1✔
120
                watchdog_queue.put(item=batch)
1✔
121

122
            watchdog_queue.join()
1✔
123

124
        dataset = self.update_dataset(self.passed_dataset)
1✔
125
        err_stack = self.err_stack
1✔
126

127
        self.clean_cache()
1✔
128

129
        return dataset, err_stack
1✔
130

131
    def apply_to_batch(self, q):
1✔
132
        while True:
1✔
133
            batch = q.get()
1✔
134
            try:
1✔
135
                batch = self.apply(batch)
1✔
136
                self.passed_dataset.append(batch)
1✔
137
            except NoncriticalProcessingError as e:
1✔
138
                err = self.generate_error_report(e, batch)
139
                logger.error(err.generate_log_message())
140
                self.err_stack.add_report(err)
×
141
                self.passed_dataset.append(batch)
×
142
            except Exception as e:
1✔
143
                err = self.generate_error_report(e, batch)
144
                logger.error(err.generate_log_message())
145
                self.err_stack.add_report(err)
1✔
146
            q.task_done()
1✔
147

148
    def apply(self, batch: DataBatch):
1✔
149
        batch = self._apply(batch)
1✔
150
        batch = self._update_processing_history(batch)
1✔
151
        return batch
1✔
152

153
    def _apply(self, batch: DataBatch):
1✔
154
        raise NotImplementedError
155

156
    def _update_processing_history(
1✔
157
        self,
158
        batch: DataBatch,
159
    ) -> DataBatch:
160
        for i, data_block in enumerate(batch):
1✔
161
            data_block[proc_history_key] += self.base_key + ","
1✔
162
            data_block["REDUCER"] = getpass.getuser()
1✔
163
            data_block["REDMACH"] = socket.gethostname()
1✔
164
            data_block["REDTIME"] = str(datetime.datetime.now())
1✔
165
            data_block["REDSOFT"] = package_name
1✔
166
            batch[i] = data_block
1✔
167
        return batch
1✔
168

169

170
class CleanupProcessor(BaseProcessor, ABC):
1✔
171
    def update_dataset(self, dataset: Dataset) -> Dataset:
1✔
172
        # Remove empty dataset
173
        new_dataset = Dataset([x for x in dataset.get_batches() if len(x) > 0])
1✔
174
        return new_dataset
1✔
175

176

177
class ImageHandler:
1✔
178
    @staticmethod
1✔
179
    def open_fits(path: str | Path) -> Image:
1✔
180
        path = str(path)
1✔
181
        data, header = open_fits(path)
1✔
182
        if raw_img_key not in header:
1✔
183
            header[raw_img_key] = path
1✔
184
        if base_name_key not in header:
1✔
185
            header[base_name_key] = Path(path).name
1✔
186
        return Image(data=data, header=header)
1✔
187

188
    @staticmethod
1✔
189
    def save_fits(
1✔
190
        image: Image,
191
        path: str | Path,
192
    ):
193
        path = str(path)
1✔
194
        data = image.get_data()
1✔
195
        header = image.get_header()
1✔
196
        if header is not None:
1✔
197
            header[latest_save_key] = path
1✔
198
        logger.info(f"Saving to {path}")
1✔
199
        save_to_path(data, header, path)
1✔
200

201
    def save_mask(self, image: Image, img_path: str) -> str:
1✔
202
        data = image.get_data()
1✔
203
        mask = (~np.isnan(data)).astype(float)
1✔
204
        mask_path = get_mask_path(img_path)
1✔
205
        header = image.get_header()
1✔
206
        header[latest_mask_save_key] = mask_path
1✔
207
        self.save_fits(Image(mask, header), mask_path)
1✔
208
        return mask_path
1✔
209

210
    @staticmethod
1✔
211
    def get_hash(image: ImageBatch):
1✔
212
        key = "".join(sorted([x[base_name_key] + x[proc_history_key] for x in image]))
1✔
213
        return hashlib.sha1(key.encode()).hexdigest()
1✔
214

215

216
class BaseImageProcessor(BaseProcessor, ImageHandler, ABC):
1✔
217
    def _apply(self, batch: ImageBatch) -> ImageBatch:
1✔
218
        return self._apply_to_images(batch)
1✔
219

220
    def _apply_to_images(
1✔
221
        self,
222
        batch: ImageBatch,
223
    ) -> ImageBatch:
224
        raise NotImplementedError
225

226

227
class ProcessorWithCache(BaseImageProcessor, ABC):
1✔
228
    def __init__(
1✔
229
        self,
230
        try_load_cache: bool = True,
231
        write_to_cache: bool = True,
232
        overwrite: bool = True,
233
        cache_sub_dir: str = cal_output_sub_dir,
234
        *args,
235
        **kwargs,
236
    ):
237
        super().__init__(*args, **kwargs)
1✔
238
        self.try_load_cache = try_load_cache
1✔
239
        self.write_to_cache = write_to_cache
1✔
240
        self.overwrite = overwrite
1✔
241
        self.cache_sub_dir = cache_sub_dir
1✔
242

243
    def select_cache_images(self, images: ImageBatch) -> ImageBatch:
1✔
244
        raise NotImplementedError
245

246
    def get_cache_path(self, images: ImageBatch) -> str:
1✔
247

248
        file_name = self.get_cache_file_name(images)
1✔
249

250
        output_path = get_output_path(
1✔
251
            base_name=file_name, dir_root=self.cache_sub_dir, sub_dir=self.night_sub_dir
252
        )
253

254
        try:
1✔
255
            os.makedirs(os.path.dirname(output_path))
1✔
256
        except OSError:
1✔
257
            pass
1✔
258

259
        return output_path
1✔
260

261
    def get_cache_file_name(self, images: ImageBatch) -> str:
1✔
262
        cache_images = self.select_cache_images(images)
1✔
263
        return f"{self.base_key}_{self.get_hash(cache_images)}.fits"
1✔
264

265
    def get_cache_file(self, images: ImageBatch) -> Image:
1✔
266

267
        path = self.get_cache_path(images)
1✔
268

269
        exists = os.path.exists(path)
1✔
270

271
        if np.logical_and(self.try_load_cache, exists):
1✔
272
            logger.info(f"Loading cached file {path}")
1✔
273
            return self.open_fits(path)
1✔
274

275
        else:
276

277
            image = self.make_image(images)
1✔
278

279
            if self.write_to_cache:
1✔
280
                if np.sum([not exists, self.overwrite]) > 0:
1✔
281
                    self.save_fits(image, path)
1✔
282

283
        return image
1✔
284

285
    def make_image(self, images: ImageBatch) -> Image:
1✔
286
        raise NotImplementedError
287

288

289
class ProcessorPremadeCache(ProcessorWithCache, ABC):
1✔
290
    def __init__(self, master_image_path: str, *args, **kwargs):
1✔
291
        super().__init__(*args, **kwargs)
1✔
292
        self.master_image_path = master_image_path
1✔
293

294
    def get_cache_path(self, images: ImageBatch) -> str:
1✔
295
        return self.master_image_path
1✔
296

297

298
class BaseCandidateGenerator(BaseProcessor, ImageHandler, ABC):
1✔
299
    @classmethod
1✔
300
    def __init_subclass__(cls, **kwargs):
1✔
301
        super().__init_subclass__(**kwargs)
1✔
302
        cls.subclasses[cls.base_key] = cls
1✔
303

304
    def _apply(self, batch: ImageBatch) -> SourceBatch:
1✔
305
        source_batch = self._apply_to_images(batch)
×
306

307
        if len(source_batch) == 0:
×
308
            err = "No sources found in image batch"
309
            logger.error(err)
310
            raise NoCandidatesError(err)
311

312
        return source_batch
×
313

314
    def _apply_to_images(self, batch: ImageBatch) -> SourceBatch:
1✔
315
        raise NotImplementedError
316

317

318
class BaseDataframeProcessor(BaseProcessor, ABC):
1✔
319
    @classmethod
1✔
320
    def __init_subclass__(cls, **kwargs):
1✔
321
        super().__init_subclass__(**kwargs)
1✔
322
        cls.subclasses[cls.base_key] = cls
1✔
323

324
    def _apply(self, batch: SourceBatch) -> SourceBatch:
1✔
325
        return self._apply_to_candidates(batch)
×
326

327
    def _apply_to_candidates(
1✔
328
        self,
329
        source_list: SourceBatch,
330
    ) -> SourceBatch:
331
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc