• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

winter-telescope / winterdrp / 3596687365

pending completion
3596687365

push

github

Robert Stein
Add black

4582 of 6121 relevant lines covered (74.86%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.86
/winterdrp/processors/base_processor.py
1
import logging
1✔
2
from abc import ABC
1✔
3
import astropy.io.fits
1✔
4
import numpy as np
1✔
5
import os
1✔
6
import socket
1✔
7
import getpass
1✔
8
import datetime
1✔
9
import hashlib
1✔
10
from pathlib import Path
1✔
11
from threading import Thread
1✔
12
from queue import Queue
1✔
13
import copy
1✔
14

15
from winterdrp.io import save_to_path, open_fits
1✔
16
from winterdrp.paths import cal_output_sub_dir, get_mask_path, latest_save_key, latest_mask_save_key, get_output_path,\
1✔
17
    base_name_key, proc_history_key, raw_img_key, package_name, max_n_cpu
18
from winterdrp.errors import ErrorReport, ErrorStack, ProcessorError, NoncriticalProcessingError
1✔
19
from winterdrp.data import DataBatch, Dataset, Image, ImageBatch, SourceBatch
1✔
20

21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
class PrerequisiteError(ProcessorError):
1✔
26
    pass
1✔
27

28

29
class NoCandidatesError(ProcessorError):
1✔
30
    pass
1✔
31

32

33
class BaseDPU:
1✔
34

35
    def base_apply(
1✔
36
            self,
37
            dataset: Dataset
38
    ) -> tuple[Dataset, ErrorStack]:
39
        raise NotImplementedError()
40

41
    def generate_error_report(self, exception: Exception, batch: DataBatch) -> ErrorReport:
1✔
42
        return ErrorReport(exception, self.__module__, batch.get_raw_image_names())
1✔
43

44

45
class BaseProcessor(BaseDPU):
1✔
46

47
    @property
1✔
48
    def base_key(self):
1✔
49
        raise NotImplementedError
50

51
    max_n_cpu: int = max_n_cpu
1✔
52

53
    subclasses = {}
1✔
54

55
    def __init__(
1✔
56
            self,
57
            *args,
58
            **kwargs
59
    ):
60

61
        self.night = None
1✔
62
        self.night_sub_dir = None
1✔
63
        self.preceding_steps = None
1✔
64
        self.passed_dataset = self.err_stack = None
1✔
65

66
    @classmethod
1✔
67
    def __init_subclass__(cls, **kwargs):
1✔
68
        super().__init_subclass__(**kwargs)
1✔
69
        cls.subclasses[cls.base_key] = cls
1✔
70

71
    def set_preceding_steps(
1✔
72
            self,
73
            previous_steps: list
74
    ):
75
        self.preceding_steps = previous_steps
1✔
76

77
    def set_night(
1✔
78
            self,
79
            night_sub_dir: str | int = ""
80
    ):
81
        self.night_sub_dir = night_sub_dir
1✔
82
        self.night = night_sub_dir.split("/")[-1]
1✔
83

84
    @staticmethod
1✔
85
    def update_dataset(
1✔
86
        dataset: Dataset
87
    ) -> Dataset:
88
        return dataset
1✔
89

90
    def check_prerequisites(
1✔
91
            self,
92
    ):
93
        pass
1✔
94

95
    def clean_cache(self):
1✔
96
        self.passed_dataset = self.err_stack = None
1✔
97

98
    def base_apply(
1✔
99
            self,
100
            dataset: Dataset
101
    ) -> tuple[Dataset, ErrorStack]:
102

103
        self.passed_dataset = Dataset()
1✔
104
        self.err_stack = ErrorStack()
1✔
105

106
        if len(dataset) > 0:
1✔
107

108
            n_cpu = min([self.max_n_cpu, len(dataset)])
1✔
109

110
            watchdog_queue = Queue()
1✔
111

112
            workers = []
1✔
113

114
            for i in range(n_cpu):
1✔
115
                # Set up a worker thread to process database load
116
                worker = Thread(target=self.apply_to_batch, args=(watchdog_queue,))
1✔
117
                worker.daemon = True
1✔
118
                worker.start()
1✔
119

120
                workers.append(worker)
1✔
121

122
            for i, batch in enumerate(dataset):
1✔
123
                watchdog_queue.put(item=batch)
1✔
124

125
            watchdog_queue.join()
1✔
126

127
        dataset = self.update_dataset(self.passed_dataset)
1✔
128
        err_stack = self.err_stack
1✔
129

130
        self.clean_cache()
1✔
131

132
        return dataset, err_stack
1✔
133

134
    def apply_to_batch(
1✔
135
            self,
136
            q
137
    ):
138
        while True:
1✔
139
            batch = q.get()
1✔
140
            try:
1✔
141
                batch = self.apply(batch)
1✔
142
                self.passed_dataset.append(batch)
1✔
143
            except NoncriticalProcessingError as e:
1✔
144
                err = self.generate_error_report(e, batch)
145
                logger.error(err.generate_log_message())
146
                self.err_stack.add_report(err)
×
147
                self.passed_dataset.append(batch)
×
148
            except Exception as e:
1✔
149
                err = self.generate_error_report(e, batch)
150
                logger.error(err.generate_log_message())
151
                self.err_stack.add_report(err)
1✔
152
            q.task_done()
1✔
153

154
    def apply(self, batch: DataBatch):
1✔
155
        batch = self._apply(batch)
1✔
156
        batch = self._update_processing_history(batch)
1✔
157
        return batch
1✔
158

159
    def _apply(self, batch: DataBatch):
1✔
160
        raise NotImplementedError
161

162
    def _update_processing_history(
1✔
163
            self,
164
            batch: DataBatch,
165
    ) -> DataBatch:
166
        for i, data_block in enumerate(batch):
1✔
167
            data_block[proc_history_key] += self.base_key + ","
1✔
168
            data_block['REDUCER'] = getpass.getuser()
1✔
169
            data_block['REDMACH'] = socket.gethostname()
1✔
170
            data_block['REDTIME'] = str(datetime.datetime.now())
1✔
171
            data_block["REDSOFT"] = package_name
1✔
172
            batch[i] = data_block
1✔
173
        return batch
1✔
174

175

176
class CleanupProcessor(BaseProcessor, ABC):
1✔
177

178
    def update_dataset(
1✔
179
        self,
180
        dataset: Dataset
181
    ) -> Dataset:
182
        # Remove empty dataset
183
        new_dataset = Dataset([x for x in dataset.get_batches() if len(x) > 0])
1✔
184
        return new_dataset
1✔
185

186

187
class ImageHandler:
1✔
188

189
    @staticmethod
1✔
190
    def open_fits(
1✔
191
            path: str | Path
192
    ) -> Image:
193
        path = str(path)
1✔
194
        data, header = open_fits(path)
1✔
195
        if raw_img_key not in header:
1✔
196
            header[raw_img_key] = path
1✔
197
        if base_name_key not in header:
1✔
198
            header[base_name_key] = Path(path).name
1✔
199
        return Image(data=data, header=header)
1✔
200

201
    @staticmethod
1✔
202
    def save_fits(
1✔
203
            image: Image,
204
            path: str | Path,
205
    ):
206
        path = str(path)
1✔
207
        data = image.get_data()
1✔
208
        header = image.get_header()
1✔
209
        if header is not None:
1✔
210
            header[latest_save_key] = path
1✔
211
        logger.info(f"Saving to {path}")
1✔
212
        save_to_path(data, header, path)
1✔
213

214
    def save_mask(
1✔
215
            self,
216
            image: Image,
217
            img_path: str
218
    ) -> str:
219
        data = image.get_data()
1✔
220
        mask = (~np.isnan(data)).astype(float)
1✔
221
        mask_path = get_mask_path(img_path)
1✔
222
        header = image.get_header()
1✔
223
        header[latest_mask_save_key] = mask_path
1✔
224
        self.save_fits(Image(mask, header), mask_path)
1✔
225
        return mask_path
1✔
226

227
    @staticmethod
1✔
228
    def get_hash(image: ImageBatch):
1✔
229
        key = "".join(sorted([x[base_name_key] + x[proc_history_key] for x in image]))
1✔
230
        return hashlib.sha1(key.encode()).hexdigest()
1✔
231

232

233
class BaseImageProcessor(BaseProcessor, ImageHandler, ABC):
1✔
234

235
    def _apply(
1✔
236
            self,
237
            batch: ImageBatch
238
    ) -> ImageBatch:
239
        return self._apply_to_images(batch)
1✔
240

241
    def _apply_to_images(
1✔
242
            self,
243
            batch: ImageBatch,
244
    ) -> ImageBatch:
245
        raise NotImplementedError
246

247

248
class ProcessorWithCache(BaseImageProcessor, ABC):
1✔
249

250
    def __init__(
1✔
251
            self,
252
            try_load_cache: bool = True,
253
            write_to_cache: bool = True,
254
            overwrite: bool = True,
255
            cache_sub_dir: str = cal_output_sub_dir,
256
            *args,
257
            **kwargs
258
    ):
259
        super().__init__(*args, **kwargs)
1✔
260
        self.try_load_cache = try_load_cache
1✔
261
        self.write_to_cache = write_to_cache
1✔
262
        self.overwrite = overwrite
1✔
263
        self.cache_sub_dir = cache_sub_dir
1✔
264

265
    def select_cache_images(self, images: ImageBatch) -> ImageBatch:
1✔
266
        raise NotImplementedError
267

268
    def get_cache_path(self, images: ImageBatch) -> str:
1✔
269

270
        file_name = self.get_cache_file_name(images)
1✔
271

272
        output_path = get_output_path(
1✔
273
            base_name=file_name,
274
            dir_root=self.cache_sub_dir,
275
            sub_dir=self.night_sub_dir
276
        )
277

278
        try:
1✔
279
            os.makedirs(os.path.dirname(output_path))
1✔
280
        except OSError:
1✔
281
            pass
1✔
282

283
        return output_path
1✔
284

285
    def get_cache_file_name(self, images: ImageBatch) -> str:
1✔
286
        cache_images = self.select_cache_images(images)
1✔
287
        return f"{self.base_key}_{self.get_hash(cache_images)}.fits"
1✔
288

289
    def get_cache_file(self, images: ImageBatch) -> Image:
1✔
290

291
        path = self.get_cache_path(images)
1✔
292

293
        exists = os.path.exists(path)
1✔
294

295
        if np.logical_and(self.try_load_cache, exists):
1✔
296
            logger.info(f"Loading cached file {path}")
1✔
297
            return self.open_fits(path)
1✔
298

299
        else:
300

301
            image = self.make_image(images)
1✔
302

303
            if self.write_to_cache:
1✔
304
                if np.sum([not exists, self.overwrite]) > 0:
1✔
305
                    self.save_fits(image, path)
1✔
306

307
        return image
1✔
308

309
    def make_image(self, images: ImageBatch) -> Image:
1✔
310
        raise NotImplementedError
311

312

313
class ProcessorPremadeCache(ProcessorWithCache, ABC):
1✔
314

315
    def __init__(
1✔
316
            self,
317
            master_image_path: str,
318
            *args,
319
            **kwargs
320
    ):
321
        super().__init__(*args, **kwargs)
1✔
322
        self.master_image_path = master_image_path
1✔
323

324
    def get_cache_path(self, images: ImageBatch) -> str:
1✔
325
        return self.master_image_path
1✔
326

327

328
class BaseCandidateGenerator(BaseProcessor, ImageHandler, ABC):
1✔
329

330
    @classmethod
1✔
331
    def __init_subclass__(cls, **kwargs):
1✔
332
        super().__init_subclass__(**kwargs)
1✔
333
        cls.subclasses[cls.base_key] = cls
1✔
334

335
    def _apply(self, batch: ImageBatch) -> SourceBatch:
1✔
336
        source_batch = self._apply_to_images(batch)
×
337

338
        if len(source_batch) == 0:
×
339
            err = "No sources found in image batch"
340
            logger.error(err)
341
            raise NoCandidatesError(err)
342

343
        return source_batch
×
344

345
    def _apply_to_images(self, batch: ImageBatch) -> SourceBatch:
1✔
346
        raise NotImplementedError
347

348

349
class BaseDataframeProcessor(BaseProcessor, ABC):
1✔
350

351
    @classmethod
1✔
352
    def __init_subclass__(cls, **kwargs):
1✔
353
        super().__init_subclass__(**kwargs)
1✔
354
        cls.subclasses[cls.base_key] = cls
1✔
355

356
    def _apply(
1✔
357
            self,
358
            batch: SourceBatch
359
    ) -> SourceBatch:
360
        return self._apply_to_candidates(batch)
×
361

362
    def _apply_to_candidates(
1✔
363
            self,
364
            source_list: SourceBatch,
365
    ) -> SourceBatch:
366
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc