• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

winter-telescope / winterdrp / 4119519683

pending completion
4119519683

push

github

GitHub
Lintify (#287)

36 of 36 new or added lines in 9 files covered. (100.0%)

5321 of 6332 relevant lines covered (84.03%)

1.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.91
/winterdrp/processors/base_processor.py
1
"""
2
Module containing the :class:`~wintedrp.processors.BaseProcessor`
3
"""
4
import datetime
2✔
5
import getpass
2✔
6
import hashlib
2✔
7
import logging
2✔
8
import socket
2✔
9
import threading
2✔
10
from abc import ABC
2✔
11
from pathlib import Path
2✔
12
from queue import Queue
2✔
13
from threading import Thread
2✔
14

15
import numpy as np
2✔
16

17
from winterdrp.data import DataBatch, Dataset, Image, ImageBatch, SourceBatch
2✔
18
from winterdrp.errors import (
2✔
19
    ErrorReport,
20
    ErrorStack,
21
    NoncriticalProcessingError,
22
    ProcessorError,
23
)
24
from winterdrp.io import open_fits, save_to_path
2✔
25
from winterdrp.paths import (
2✔
26
    BASE_NAME_KEY,
27
    CAL_OUTPUT_SUB_DIR,
28
    LATEST_SAVE_KEY,
29
    LATEST_WEIGHT_SAVE_KEY,
30
    PROC_HISTORY_KEY,
31
    RAW_IMG_KEY,
32
    get_output_path,
33
    get_weight_path,
34
    max_n_cpu,
35
    package_name,
36
)
37

38
logger = logging.getLogger(__name__)
2✔
39

40

41
class PrerequisiteError(ProcessorError):
2✔
42
    """
43
    An error raised if a processor requires another one as a prerequisite,
44
    but that processor is not present
45
    """
46

47

48
class NoCandidatesError(ProcessorError):
2✔
49
    """
50
    An error raised if a :class:`~wintedrp.processors.CandidateGenerator` produces
51
    no candidates
52
    """
53

54

55
class BaseProcessor:
2✔
56
    """
57
    Base processor class, to be inherited from for all processors
58
    """
59

60
    @property
2✔
61
    def base_key(self):
2✔
62
        """
63
        Unique key for the processor, to be used e.g in processing history tracking
64

65
        :return: None
66
        """
67
        raise NotImplementedError
68

69
    max_n_cpu: int = max_n_cpu
2✔
70

71
    subclasses = {}
2✔
72

73
    def __init__(self):
2✔
74
        self.night = None
2✔
75
        self.night_sub_dir = None
2✔
76
        self.preceding_steps = None
2✔
77
        self.passed_dataset = {}
2✔
78
        self.err_stack = {}
2✔
79

80
    @classmethod
2✔
81
    def __init_subclass__(cls, **kwargs):
2✔
82
        super().__init_subclass__(**kwargs)
2✔
83
        cls.subclasses[cls.base_key] = cls
2✔
84

85
    def set_preceding_steps(self, previous_steps: list):
2✔
86
        """
87
        Provides processor with the list of preceding processors, and saves this
88

89
        :param previous_steps: list of processors
90
        :return: None
91
        """
92
        self.preceding_steps = previous_steps
2✔
93

94
    def set_night(self, night_sub_dir: str | int = ""):
2✔
95
        """
96
        Sets the night subdirectory for the processor to read/write data
97

98
        :param night_sub_dir: String/int subdirectory for night
99
        :return: None
100
        """
101
        self.night_sub_dir = night_sub_dir
2✔
102
        self.night = night_sub_dir.split("/")[-1]
2✔
103

104
    def generate_error_report(
2✔
105
        self, exception: Exception, batch: DataBatch
106
    ) -> ErrorReport:
107
        """
108
        Generates an error report based on a python Exception
109

110
        :param exception: exception raised
111
        :param batch: batch which generated exception
112
        :return: error report
113
        """
114
        return ErrorReport(exception, self.__module__, batch.get_raw_image_names())
2✔
115

116
    def update_dataset(self, dataset: Dataset) -> Dataset:
2✔
117
        """
118
        Update a dataset after processing
119

120
        :param dataset: Initial dataset
121
        :return: Updated dataset
122
        """
123
        return dataset
2✔
124

125
    def check_prerequisites(
2✔
126
        self,
127
    ):
128
        """
129
        Check to see if any prerequisite processors are missing
130

131
        :return: None
132
        """
133

134
    def clean_cache(self, cache_id: int):
2✔
135
        """
136
        Function to clean the internal cache filled by base_apply
137

138
        :param cache_id: key for cache
139
        :return: None
140

141
        """
142
        del self.passed_dataset[cache_id]
2✔
143
        del self.err_stack[cache_id]
2✔
144

145
    def base_apply(self, dataset: Dataset) -> tuple[Dataset, ErrorStack]:
2✔
146
        """
147
        Core function to act on a dataset, and return an updated dataset
148

149
        :param dataset: Input dataset
150
        :return: Updated dataset, and any caught errors
151
        """
152
        cache_id = threading.get_ident()
2✔
153

154
        self.passed_dataset[cache_id] = Dataset()
2✔
155
        self.err_stack[cache_id] = ErrorStack()
2✔
156

157
        if len(dataset) > 0:
2✔
158
            n_cpu = min([self.max_n_cpu, len(dataset)])
2✔
159

160
            watchdog_queue = Queue()
2✔
161

162
            workers = []
2✔
163

164
            for _ in range(n_cpu):
2✔
165
                # Set up a worker thread to process database load
166
                worker = Thread(
2✔
167
                    target=self.apply_to_batch, args=(watchdog_queue, cache_id)
168
                )
169
                worker.daemon = True
2✔
170
                worker.start()
2✔
171

172
                workers.append(worker)
2✔
173

174
            for batch in dataset:
2✔
175
                watchdog_queue.put(item=batch)
2✔
176

177
            watchdog_queue.join()
2✔
178

179
        dataset = self.update_dataset(self.passed_dataset[cache_id])
2✔
180
        err_stack = self.err_stack[cache_id]
2✔
181

182
        self.clean_cache(cache_id=cache_id)
2✔
183

184
        return dataset, err_stack
2✔
185

186
    def apply_to_batch(self, queue, cache_id: int):
2✔
187
        """
188
        Function to run self.apply on a batch in the queue, catch any errors, and then
189
        update the internal cache with the results.
190

191
        :param queue: python threading queue
192
        :param cache_id: key for cache
193
        :return: None
194
        """
195
        while True:
2✔
196
            batch = queue.get()
2✔
197
            try:
2✔
198
                batch = self.apply(batch)
2✔
199
                self.passed_dataset[cache_id].append(batch)
2✔
200
            except NoncriticalProcessingError as exc:
2✔
201
                err = self.generate_error_report(exc, batch)
202
                logger.error(err.generate_log_message())
203
                self.err_stack[cache_id].add_report(err)
×
204
                self.passed_dataset[cache_id].append(batch)
×
205
            except Exception as exc:  # pylint: disable=broad-except
2✔
206
                err = self.generate_error_report(exc, batch)
207
                logger.error(err.generate_log_message())
208
                self.err_stack[cache_id].add_report(err)
2✔
209
            queue.task_done()
2✔
210

211
    def apply(self, batch: DataBatch):
2✔
212
        """
213
        Function applying the processor to a
214
        :class:`~winterdrp.data.base_data.DataBatch`.
215
        Also updates the processing history.
216

217
        :param batch: input data batch
218
        :return: updated data batch
219
        """
220
        batch = self._apply(batch)
2✔
221
        batch = self._update_processing_history(batch)
2✔
222
        return batch
2✔
223

224
    def _apply(self, batch: DataBatch) -> DataBatch:
2✔
225
        """
226
        Core function to update the :class:`~winterdrp.data.base_data.DataBatch`
227

228
        :param batch: Input data batch
229
        :return: updated data batch
230
        """
231
        raise NotImplementedError
232

233
    def _update_processing_history(
2✔
234
        self,
235
        batch: DataBatch,
236
    ) -> DataBatch:
237
        """
238
        Function to update the processing history of each
239
        :class:`~winterdrp.data.base_data.DataBlock` object in a
240
        :class:`~winterdrp.data.base_data.DataBatch`.
241

242
        :param batch: Input data batch
243
        :return: Updated data batch
244
        """
245
        for i, data_block in enumerate(batch):
2✔
246
            data_block[PROC_HISTORY_KEY] += self.base_key + ","
2✔
247
            data_block["REDUCER"] = getpass.getuser()
2✔
248
            data_block["REDMACH"] = socket.gethostname()
2✔
249
            data_block["REDTIME"] = str(datetime.datetime.now())
2✔
250
            data_block["REDSOFT"] = package_name
2✔
251
            batch[i] = data_block
2✔
252
        return batch
2✔
253

254

255
class CleanupProcessor(BaseProcessor, ABC):
2✔
256
    """
257
    Processor which 'cleans up' by deleting empty batches
258
    """
259

260
    def update_dataset(self, dataset: Dataset) -> Dataset:
2✔
261
        # Remove empty dataset
262
        new_dataset = Dataset([x for x in dataset.get_batches() if len(x) > 0])
2✔
263
        return new_dataset
2✔
264

265

266
class ImageHandler:
2✔
267
    """
268
    Base class for handling images
269
    """
270

271
    @staticmethod
2✔
272
    def open_fits(path: str | Path) -> Image:
2✔
273
        """
274
        Opens a fits file, and returns an Image object
275

276
        :param path: Path of image
277
        :return: Image object
278
        """
279
        path = str(path)
2✔
280
        data, header = open_fits(path)
2✔
281
        if RAW_IMG_KEY not in header:
2✔
282
            header[RAW_IMG_KEY] = path
2✔
283
        if BASE_NAME_KEY not in header:
2✔
284
            header[BASE_NAME_KEY] = Path(path).name
2✔
285
        return Image(data=data, header=header)
2✔
286

287
    @staticmethod
2✔
288
    def save_fits(
2✔
289
        image: Image,
290
        path: str | Path,
291
    ):
292
        """
293
        Save an Image to path
294

295
        :param image: Image to save
296
        :param path: path
297
        :return: None
298
        """
299
        path = str(path)
2✔
300
        data = image.get_data()
2✔
301
        header = image.get_header()
2✔
302
        if header is not None:
2✔
303
            header[LATEST_SAVE_KEY] = path
2✔
304
        logger.info(f"Saving to {path}")
2✔
305
        save_to_path(data, header, path)
2✔
306

307
    def save_weight_image(self, image: Image, img_path: Path) -> Path:
2✔
308
        """
309
        Saves a weight image
310

311
        :param image: Weight image
312
        :param img_path: Path of parent image
313
        :return: Path of weight image
314
        """
315
        data = image.get_data()
2✔
316
        mask = (~np.isnan(data)).astype(float)
2✔
317
        weight_path = get_weight_path(img_path)
2✔
318
        header = image.get_header()
2✔
319
        header[LATEST_WEIGHT_SAVE_KEY] = str(weight_path)
2✔
320
        self.save_fits(Image(mask, header), weight_path)
2✔
321
        return weight_path
2✔
322

323
    @staticmethod
2✔
324
    def get_hash(image_batch: ImageBatch):
2✔
325
        """
326
        Get a unique hash for an image batch
327

328
        :param image_batch: image batch
329
        :return: unique hash for that batch
330
        """
331
        key = "".join(
2✔
332
            sorted([x[BASE_NAME_KEY] + x[PROC_HISTORY_KEY] for x in image_batch])
333
        )
334
        return hashlib.sha1(key.encode()).hexdigest()
2✔
335

336

337
class BaseImageProcessor(BaseProcessor, ImageHandler, ABC):
2✔
338
    """
339
    Base processor handling images in/images out
340
    """
341

342
    def _apply(self, batch: ImageBatch) -> ImageBatch:
2✔
343
        return self._apply_to_images(batch)
2✔
344

345
    def _apply_to_images(
2✔
346
        self,
347
        batch: ImageBatch,
348
    ) -> ImageBatch:
349
        raise NotImplementedError
350

351

352
class ProcessorWithCache(BaseImageProcessor, ABC):
2✔
353
    """
354
    Image processor with cached images associated to it, e.g a master flat
355
    """
356

357
    def __init__(
2✔
358
        self,
359
        try_load_cache: bool = True,
360
        write_to_cache: bool = True,
361
        overwrite: bool = True,
362
        cache_sub_dir: str = CAL_OUTPUT_SUB_DIR,
363
    ):
364
        super().__init__()
2✔
365
        self.try_load_cache = try_load_cache
2✔
366
        self.write_to_cache = write_to_cache
2✔
367
        self.overwrite = overwrite
2✔
368
        self.cache_sub_dir = cache_sub_dir
2✔
369

370
    def select_cache_images(self, images: ImageBatch) -> ImageBatch:
2✔
371
        """
372
        Select the appropriate cached image for the batch
373

374
        :param images: images to process
375
        :return: cached images to use
376
        """
377
        raise NotImplementedError
378

379
    def get_cache_path(self, images: ImageBatch) -> Path:
2✔
380
        """
381
        Gets path for saving/loading cached image
382

383
        :param images: images to process
384
        :return: cache path
385
        """
386

387
        file_name = self.get_cache_file_name(images)
2✔
388

389
        output_path = get_output_path(
2✔
390
            base_name=file_name, dir_root=self.cache_sub_dir, sub_dir=self.night_sub_dir
391
        )
392

393
        output_path.parent.mkdir(parents=True, exist_ok=True)
2✔
394

395
        return output_path
2✔
396

397
    def get_cache_file_name(self, images: ImageBatch) -> str:
2✔
398
        """
399
        Get unique cache name for images
400

401
        :param images: images to process
402
        :return: unique hashed name
403
        """
404
        cache_images = self.select_cache_images(images)
2✔
405
        return f"{self.base_key}_{self.get_hash(cache_images)}.fits"
2✔
406

407
    def get_cache_file(self, images: ImageBatch) -> Image:
2✔
408
        """
409
        Return the appropriate cached image for the batch
410

411
        :param images: images to process
412
        :return: cached image to use
413
        """
414

415
        path = self.get_cache_path(images)
2✔
416

417
        exists = path.exists()
2✔
418

419
        if np.logical_and(self.try_load_cache, exists):
2✔
420
            logger.info(f"Loading cached file {path}")
2✔
421
            return self.open_fits(path)
2✔
422

423
        image = self.make_image(images)
2✔
424

425
        if self.write_to_cache:
2✔
426
            if np.sum([not exists, self.overwrite]) > 0:
2✔
427
                self.save_fits(image, path)
2✔
428

429
        return image
2✔
430

431
    def make_image(self, images: ImageBatch) -> Image:
2✔
432
        """
433
        Make a cached image (e.g master flat)
434

435
        :param images: images to use
436
        :return: cached image
437
        """
438
        raise NotImplementedError
439

440

441
class ProcessorPremadeCache(ProcessorWithCache, ABC):
2✔
442
    """
443
    Processor with pre-made master image
444
    """
445

446
    def __init__(self, master_image_path: str | Path, *args, **kwargs):
2✔
447
        super().__init__(*args, **kwargs)
2✔
448
        self.master_image_path = Path(master_image_path)
2✔
449

450
    def get_cache_path(self, images: ImageBatch) -> Path:
2✔
451
        return self.master_image_path
2✔
452

453

454
class BaseCandidateGenerator(BaseProcessor, ImageHandler, ABC):
2✔
455
    """
456
    Base CandidateGenerator processor (image batch in, source batch out)
457
    """
458

459
    @classmethod
2✔
460
    def __init_subclass__(cls, **kwargs):
2✔
461
        super().__init_subclass__(**kwargs)
2✔
462
        cls.subclasses[cls.base_key] = cls
2✔
463

464
    def _apply(self, batch: ImageBatch) -> SourceBatch:
2✔
465
        source_batch = self._apply_to_images(batch)
2✔
466

467
        if len(source_batch) == 0:
2✔
468
            err = "No sources found in image batch"
469
            logger.error(err)
470
            raise NoCandidatesError(err)
471

472
        return source_batch
2✔
473

474
    def _apply_to_images(self, batch: ImageBatch) -> SourceBatch:
2✔
475
        raise NotImplementedError
476

477

478
class BaseDataframeProcessor(BaseProcessor, ABC):
2✔
479
    """
480
    Base dataframe processor (Source batch in, source batch out)
481
    """
482

483
    @classmethod
2✔
484
    def __init_subclass__(cls, **kwargs):
2✔
485
        super().__init_subclass__(**kwargs)
2✔
486
        cls.subclasses[cls.base_key] = cls
2✔
487

488
    def _apply(self, batch: SourceBatch) -> SourceBatch:
2✔
489
        return self._apply_to_candidates(batch)
2✔
490

491
    def _apply_to_candidates(
2✔
492
        self,
493
        batch: SourceBatch,
494
    ) -> SourceBatch:
495
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc