• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

winter-telescope / winterdrp / 3722291905

pending completion
3722291905

Pull #246

github

GitHub
Merge d7c0428bb into db3f40209
Pull Request #246: Separate cache per thread for multiprocessing

32 of 32 new or added lines in 3 files covered. (100.0%)

4640 of 6166 relevant lines covered (75.25%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.72
/winterdrp/processors/base_processor.py
1
"""
2
Module containing the :class:`~wintedrp.processors.BaseProcessor`
3
"""
4
import datetime
1✔
5
import getpass
1✔
6
import hashlib
1✔
7
import logging
1✔
8
import socket
1✔
9
import threading
1✔
10
from abc import ABC
1✔
11
from pathlib import Path
1✔
12
from queue import Queue
1✔
13
from threading import Thread
1✔
14

15
import numpy as np
1✔
16

17
from winterdrp.data import DataBatch, Dataset, Image, ImageBatch, SourceBatch
1✔
18
from winterdrp.errors import (
1✔
19
    ErrorReport,
20
    ErrorStack,
21
    NoncriticalProcessingError,
22
    ProcessorError,
23
)
24
from winterdrp.io import open_fits, save_to_path
1✔
25
from winterdrp.paths import (
1✔
26
    BASE_NAME_KEY,
27
    CAL_OUTPUT_SUB_DIR,
28
    LATEST_SAVE_KEY,
29
    LATEST_WEIGHT_SAVE_KEY,
30
    PROC_HISTORY_KEY,
31
    RAW_IMG_KEY,
32
    get_output_path,
33
    get_weight_path,
34
    max_n_cpu,
35
    package_name,
36
)
37

38
logger = logging.getLogger(__name__)
1✔
39

40

41
class PrerequisiteError(ProcessorError):
1✔
42
    """
43
    An error raised if a processor requires another one as a prerequisite,
44
    but that processor is not present
45
    """
46

47

48
class NoCandidatesError(ProcessorError):
1✔
49
    """
50
    An error raised if a :class:`~wintedrp.processors.CandidateGenerator` produces
51
    no candidates
52
    """
53

54

55
class BaseProcessor:
1✔
56
    """
57
    Base processor class, to be inherited from for all processors
58
    """
59

60
    @property
1✔
61
    def base_key(self):
1✔
62
        """
63
        Unique key for the processor, to be used e.g in processing history tracking
64

65
        :return: None
66
        """
67
        raise NotImplementedError
68

69
    max_n_cpu: int = max_n_cpu
1✔
70

71
    subclasses = {}
1✔
72

73
    def __init__(self):
1✔
74
        self.night = None
1✔
75
        self.night_sub_dir = None
1✔
76
        self.preceding_steps = None
1✔
77
        self.passed_dataset = {}
1✔
78
        self.err_stack = {}
1✔
79

80
    @classmethod
1✔
81
    def __init_subclass__(cls, **kwargs):
1✔
82
        super().__init_subclass__(**kwargs)
1✔
83
        cls.subclasses[cls.base_key] = cls
1✔
84

85
    def set_preceding_steps(self, previous_steps: list):
1✔
86
        """
87
        Provides processor with the list of preceding processors, and saves this
88

89
        :param previous_steps: list of processors
90
        :return: None
91
        """
92
        self.preceding_steps = previous_steps
1✔
93

94
    def set_night(self, night_sub_dir: str | int = ""):
1✔
95
        """
96
        Sets the night subdirectory for the processor to read/write data
97

98
        :param night_sub_dir: String/int subdirectory for night
99
        :return: None
100
        """
101
        self.night_sub_dir = night_sub_dir
1✔
102
        self.night = night_sub_dir.split("/")[-1]
1✔
103

104
    def generate_error_report(
1✔
105
        self, exception: Exception, batch: DataBatch
106
    ) -> ErrorReport:
107
        """
108
        Generates an error report based on a python Exception
109

110
        :param exception: exception raised
111
        :param batch: batch which generated exception
112
        :return: error report
113
        """
114
        return ErrorReport(exception, self.__module__, batch.get_raw_image_names())
1✔
115

116
    def update_dataset(self, dataset: Dataset) -> Dataset:
1✔
117
        """
118
        Update a dataset after processing
119

120
        :param dataset: Initial dataset
121
        :return: Updated dataset
122
        """
123
        return dataset
1✔
124

125
    def check_prerequisites(
1✔
126
        self,
127
    ):
128
        """
129
        Check to see if any prerequisite processors are missing
130

131
        :return: None
132
        """
133

134
    def clean_cache(self, cache_id: int):
1✔
135
        """
136
        Function to clean the internal cache filled by base_apply
137

138
        :param cache_id: key for cache
139
        :return: None
140

141
        """
142
        del self.passed_dataset[cache_id]
1✔
143
        del self.err_stack[cache_id]
1✔
144

145
    def base_apply(self, dataset: Dataset) -> tuple[Dataset, ErrorStack]:
1✔
146
        """
147
        Core function to act on a dataset, and return an updated dataset
148

149
        :param dataset: Input dataset
150
        :return: Updated dataset, and any caught errors
151
        """
152
        cache_id = threading.get_ident()
1✔
153

154
        self.passed_dataset[cache_id] = Dataset()
1✔
155
        self.err_stack[cache_id] = ErrorStack()
1✔
156

157
        if len(dataset) > 0:
1✔
158

159
            n_cpu = min([self.max_n_cpu, len(dataset)])
1✔
160

161
            watchdog_queue = Queue()
1✔
162

163
            workers = []
1✔
164

165
            for _ in range(n_cpu):
1✔
166
                # Set up a worker thread to process database load
167
                worker = Thread(
1✔
168
                    target=self.apply_to_batch, args=(watchdog_queue, cache_id)
169
                )
170
                worker.daemon = True
1✔
171
                worker.start()
1✔
172

173
                workers.append(worker)
1✔
174

175
            for batch in dataset:
1✔
176
                watchdog_queue.put(item=batch)
1✔
177

178
            watchdog_queue.join()
1✔
179

180
        dataset = self.update_dataset(self.passed_dataset[cache_id])
1✔
181
        err_stack = self.err_stack[cache_id]
1✔
182

183
        self.clean_cache(cache_id=cache_id)
1✔
184

185
        return dataset, err_stack
1✔
186

187
    def apply_to_batch(self, queue, cache_id: int):
1✔
188
        """
189
        Function to run self.apply on a batch in the queue, catch any errors, and then
190
        update the internal cache with the results.
191

192
        :param queue: python threading queue
193
        :param cache_id: key for cache
194
        :return: None
195
        """
196
        while True:
1✔
197
            batch = queue.get()
1✔
198
            try:
1✔
199
                batch = self.apply(batch)
1✔
200
                self.passed_dataset[cache_id].append(batch)
1✔
201
            except NoncriticalProcessingError as exc:
1✔
202
                err = self.generate_error_report(exc, batch)
203
                logger.error(err.generate_log_message())
204
                self.err_stack[cache_id].add_report(err)
×
205
                self.passed_dataset[cache_id].append(batch)
×
206
            except Exception as exc:  # pylint: disable=broad-except
1✔
207
                err = self.generate_error_report(exc, batch)
208
                logger.error(err.generate_log_message())
209
                self.err_stack[cache_id].add_report(err)
1✔
210
            queue.task_done()
1✔
211

212
    def apply(self, batch: DataBatch):
1✔
213
        """
214
        Function applying the processor to a
215
        :class:`~winterdrp.data.base_data.DataBatch`.
216
        Also updates the processing history.
217

218
        :param batch: input data batch
219
        :return: updated data batch
220
        """
221
        batch = self._apply(batch)
1✔
222
        batch = self._update_processing_history(batch)
1✔
223
        return batch
1✔
224

225
    def _apply(self, batch: DataBatch) -> DataBatch:
1✔
226
        """
227
        Core function to update the :class:`~winterdrp.data.base_data.DataBatch`
228

229
        :param batch: Input data batch
230
        :return: updated data batch
231
        """
232
        raise NotImplementedError
233

234
    def _update_processing_history(
1✔
235
        self,
236
        batch: DataBatch,
237
    ) -> DataBatch:
238
        """
239
        Function to update the processing history of each
240
        :class:`~winterdrp.data.base_data.DataBlock` object in a
241
        :class:`~winterdrp.data.base_data.DataBatch`.
242

243
        :param batch: Input data batch
244
        :return: Updated data batch
245
        """
246
        for i, data_block in enumerate(batch):
1✔
247
            data_block[PROC_HISTORY_KEY] += self.base_key + ","
1✔
248
            data_block["REDUCER"] = getpass.getuser()
1✔
249
            data_block["REDMACH"] = socket.gethostname()
1✔
250
            data_block["REDTIME"] = str(datetime.datetime.now())
1✔
251
            data_block["REDSOFT"] = package_name
1✔
252
            batch[i] = data_block
1✔
253
        return batch
1✔
254

255

256
class CleanupProcessor(BaseProcessor, ABC):
1✔
257
    """
258
    Processor which 'cleans up' by deleting empty batches
259
    """
260

261
    def update_dataset(self, dataset: Dataset) -> Dataset:
1✔
262
        # Remove empty dataset
263
        new_dataset = Dataset([x for x in dataset.get_batches() if len(x) > 0])
1✔
264
        return new_dataset
1✔
265

266

267
class ImageHandler:
1✔
268
    """
269
    Base class for handling images
270
    """
271

272
    @staticmethod
1✔
273
    def open_fits(path: str | Path) -> Image:
1✔
274
        """
275
        Opens a fits file, and returns an Image object
276

277
        :param path: Path of image
278
        :return: Image object
279
        """
280
        path = str(path)
1✔
281
        data, header = open_fits(path)
1✔
282
        if RAW_IMG_KEY not in header:
1✔
283
            header[RAW_IMG_KEY] = path
1✔
284
        if BASE_NAME_KEY not in header:
1✔
285
            header[BASE_NAME_KEY] = Path(path).name
1✔
286
        return Image(data=data, header=header)
1✔
287

288
    @staticmethod
1✔
289
    def save_fits(
1✔
290
        image: Image,
291
        path: str | Path,
292
    ):
293
        """
294
        Save an Image to path
295

296
        :param image: Image to save
297
        :param path: path
298
        :return: None
299
        """
300
        path = str(path)
1✔
301
        data = image.get_data()
1✔
302
        header = image.get_header()
1✔
303
        if header is not None:
1✔
304
            header[LATEST_SAVE_KEY] = path
1✔
305
        logger.info(f"Saving to {path}")
1✔
306
        save_to_path(data, header, path)
1✔
307

308
    def save_weight_image(self, image: Image, img_path: Path) -> Path:
1✔
309
        """
310
        Saves a weight image
311

312
        :param image: Weight image
313
        :param img_path: Path of parent image
314
        :return: Path of weight image
315
        """
316
        data = image.get_data()
1✔
317
        mask = (~np.isnan(data)).astype(float)
1✔
318
        weight_path = get_weight_path(img_path)
1✔
319
        header = image.get_header()
1✔
320
        header[LATEST_WEIGHT_SAVE_KEY] = str(weight_path)
1✔
321
        self.save_fits(Image(mask, header), weight_path)
1✔
322
        return weight_path
1✔
323

324
    @staticmethod
1✔
325
    def get_hash(image_batch: ImageBatch):
1✔
326
        """
327
        Get a unique hash for an image batch
328

329
        :param image_batch: image batch
330
        :return: unique hash for that batch
331
        """
332
        key = "".join(
1✔
333
            sorted([x[BASE_NAME_KEY] + x[PROC_HISTORY_KEY] for x in image_batch])
334
        )
335
        return hashlib.sha1(key.encode()).hexdigest()
1✔
336

337

338
class BaseImageProcessor(BaseProcessor, ImageHandler, ABC):
1✔
339
    """
340
    Base processor handling images in/images out
341
    """
342

343
    def _apply(self, batch: ImageBatch) -> ImageBatch:
1✔
344
        return self._apply_to_images(batch)
1✔
345

346
    def _apply_to_images(
1✔
347
        self,
348
        batch: ImageBatch,
349
    ) -> ImageBatch:
350
        raise NotImplementedError
351

352

353
class ProcessorWithCache(BaseImageProcessor, ABC):
1✔
354
    """
355
    Image processor with cached images associated to it, e.g a master flat
356
    """
357

358
    def __init__(
1✔
359
        self,
360
        *args,
361
        try_load_cache: bool = True,
362
        write_to_cache: bool = True,
363
        overwrite: bool = True,
364
        cache_sub_dir: str = CAL_OUTPUT_SUB_DIR,
365
        **kwargs,
366
    ):
367
        super().__init__(*args, **kwargs)
1✔
368
        self.try_load_cache = try_load_cache
1✔
369
        self.write_to_cache = write_to_cache
1✔
370
        self.overwrite = overwrite
1✔
371
        self.cache_sub_dir = cache_sub_dir
1✔
372

373
    def select_cache_images(self, images: ImageBatch) -> ImageBatch:
1✔
374
        """
375
        Select the appropriate cached image for the batch
376

377
        :param images: images to process
378
        :return: cached images to use
379
        """
380
        raise NotImplementedError
381

382
    def get_cache_path(self, images: ImageBatch) -> Path:
1✔
383
        """
384
        Gets path for saving/loading cached image
385

386
        :param images: images to process
387
        :return: cache path
388
        """
389

390
        file_name = self.get_cache_file_name(images)
1✔
391

392
        output_path = get_output_path(
1✔
393
            base_name=file_name, dir_root=self.cache_sub_dir, sub_dir=self.night_sub_dir
394
        )
395

396
        output_path.parent.mkdir(parents=True, exist_ok=True)
1✔
397

398
        return output_path
1✔
399

400
    def get_cache_file_name(self, images: ImageBatch) -> str:
1✔
401
        """
402
        Get unique cache name for images
403

404
        :param images: images to process
405
        :return: unique hashed name
406
        """
407
        cache_images = self.select_cache_images(images)
1✔
408
        return f"{self.base_key}_{self.get_hash(cache_images)}.fits"
1✔
409

410
    def get_cache_file(self, images: ImageBatch) -> Image:
1✔
411
        """
412
        Return the appropriate cached image for the batch
413

414
        :param images: images to process
415
        :return: cached image to use
416
        """
417

418
        path = self.get_cache_path(images)
1✔
419

420
        exists = path.exists()
1✔
421

422
        if np.logical_and(self.try_load_cache, exists):
1✔
423
            logger.info(f"Loading cached file {path}")
1✔
424
            return self.open_fits(path)
1✔
425

426
        image = self.make_image(images)
1✔
427

428
        if self.write_to_cache:
1✔
429
            if np.sum([not exists, self.overwrite]) > 0:
1✔
430
                self.save_fits(image, path)
1✔
431

432
        return image
1✔
433

434
    def make_image(self, images: ImageBatch) -> Image:
1✔
435
        """
436
        Make a cached image (e.g master flat)
437

438
        :param images: images to use
439
        :return: cached image
440
        """
441
        raise NotImplementedError
442

443

444
class ProcessorPremadeCache(ProcessorWithCache, ABC):
1✔
445
    """
446
    Processor with pre-made master image
447
    """
448

449
    def __init__(self, master_image_path: str | Path, *args, **kwargs):
1✔
450
        super().__init__(*args, **kwargs)
1✔
451
        self.master_image_path = Path(master_image_path)
1✔
452

453
    def get_cache_path(self, images: ImageBatch) -> Path:
1✔
454
        return self.master_image_path
1✔
455

456

457
class BaseCandidateGenerator(BaseProcessor, ImageHandler, ABC):
1✔
458
    """
459
    Base CadidateGenerator processor (image batch in, source batch out)
460
    """
461

462
    @classmethod
1✔
463
    def __init_subclass__(cls, **kwargs):
1✔
464
        super().__init_subclass__(**kwargs)
1✔
465
        cls.subclasses[cls.base_key] = cls
1✔
466

467
    def _apply(self, batch: ImageBatch) -> SourceBatch:
1✔
468
        source_batch = self._apply_to_images(batch)
×
469

470
        if len(source_batch) == 0:
×
471
            err = "No sources found in image batch"
472
            logger.error(err)
473
            raise NoCandidatesError(err)
474

475
        return source_batch
×
476

477
    def _apply_to_images(self, batch: ImageBatch) -> SourceBatch:
1✔
478
        raise NotImplementedError
479

480

481
class BaseDataframeProcessor(BaseProcessor, ABC):
1✔
482
    """
483
    Base dataframe processor (Source batch in, source batch out)
484
    """
485

486
    @classmethod
1✔
487
    def __init_subclass__(cls, **kwargs):
1✔
488
        super().__init_subclass__(**kwargs)
1✔
489
        cls.subclasses[cls.base_key] = cls
1✔
490

491
    def _apply(self, batch: SourceBatch) -> SourceBatch:
1✔
492
        return self._apply_to_candidates(batch)
×
493

494
    def _apply_to_candidates(
1✔
495
        self,
496
        batch: SourceBatch,
497
    ) -> SourceBatch:
498
        raise NotImplementedError
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc