• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

winter-telescope / winterdrp / 3598996912

pending completion
3598996912

push

github

GitHub
Write ErrorLog for __main__.py (#227)

25 of 25 new or added lines in 2 files covered. (100.0%)

4572 of 6106 relevant lines covered (74.88%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.54
/winterdrp/monitor/base_monitor.py
1
"""
2
Script containing the :class:`~winterdrp.monitor.base_monitor.Monitor` class,
3
used for processing data in real time.
4
"""
5
import copy
1✔
6
import logging
1✔
7
import os
1✔
8
import sys
1✔
9
import threading
1✔
10
import time
1✔
11
import warnings
1✔
12
from pathlib import Path
1✔
13
from queue import Queue
1✔
14
from threading import Thread
1✔
15
from warnings import catch_warnings
1✔
16

17
import numpy as np
1✔
18
from astropy import units as u
1✔
19
from astropy.io import fits
1✔
20
from astropy.time import Time
1✔
21
from astropy.utils.exceptions import AstropyUserWarning
1✔
22
from watchdog.events import FileSystemEventHandler
1✔
23
from watchdog.observers import Observer
1✔
24

25
from winterdrp.data import Dataset, ImageBatch
1✔
26
from winterdrp.errors import ErrorReport, ErrorStack
1✔
27
from winterdrp.paths import (
1✔
28
    __version__,
29
    base_raw_dir,
30
    get_output_path,
31
    max_n_cpu,
32
    package_name,
33
    raw_img_dir,
34
    raw_img_sub_dir,
35
    watchdog_email_key,
36
    watchdog_recipient_key,
37
)
38
from winterdrp.pipelines import get_pipeline
1✔
39
from winterdrp.processors.csvlog import CSVLog
1✔
40
from winterdrp.processors.utils.cal_hunter import CalRequirement, find_required_cals
1✔
41
from winterdrp.processors.utils.image_loader import ImageLoader
1✔
42
from winterdrp.utils.send_email import send_gmail
1✔
43

44
logger = logging.getLogger(__name__)
1✔
45

46

47
class NewImageHandler(FileSystemEventHandler):
1✔
48
    """Class to watch a directory, and add newly-created files to a queue."""
49

50
    def __init__(self, queue):
1✔
51
        FileSystemEventHandler.__init__(self)
1✔
52
        self.queue = queue
1✔
53

54
    def on_created(self, event):
1✔
55
        if event.event_type == "created":
1✔
56
            self.queue.put(event)
1✔
57

58

59
class Monitor:
1✔
60
    """Class to 'monitor' a directory, watching for newly created files.
61
    It then reduces these files. It will watch for a fixed duration,
62
    and run a postprocessing step at some configurable time after starting.
63
    It can send automated email notifications.
64
    """
65

66
    def __init__(
1✔
67
        self,
68
        night: str,
69
        pipeline: str,
70
        cal_requirements: list[CalRequirement] = None,
71
        realtime_configurations: str | list[str] = "default",
72
        postprocess_configurations: str | list[str] = None,
73
        email_sender: str = os.getenv(watchdog_email_key),
74
        email_recipients: str | list = os.getenv(watchdog_recipient_key),
75
        midway_postprocess_hours: float = 16.0,
76
        final_postprocess_hours: float = 48.0,
77
        log_level: str = "INFO",
78
        raw_dir: str = raw_img_sub_dir,
79
        base_raw_img_dir: str = base_raw_dir,
80
    ):
81

82
        logger.info(f"Software version: {package_name}=={__version__}")
1✔
83

84
        self.errorstack = ErrorStack()
1✔
85
        self.night = night
1✔
86
        self.pipeline_name = pipeline
1✔
87

88
        if not isinstance(realtime_configurations, list):
1✔
89
            realtime_configurations = [realtime_configurations]
1✔
90
        self.realtime_configurations = realtime_configurations
1✔
91

92
        self.postprocess_configurations = postprocess_configurations
1✔
93

94
        self.pipeline = get_pipeline(
1✔
95
            pipeline, night=night, selected_configurations=realtime_configurations
96
        )
97

98
        self.raw_image_directory = Path(
1✔
99
            raw_img_dir(
100
                sub_dir=self.pipeline.night_sub_dir,
101
                img_sub_dir=raw_dir,
102
                raw_dir=base_raw_img_dir,
103
            )
104
        )
105

106
        self.sub_dir = raw_dir
1✔
107

108
        if not self.raw_image_directory.exists():
1✔
109
            self.raw_image_directory.parent.mkdir(parents=True)
×
110

111
        self.log_level = log_level
1✔
112
        self.log_path = self.configure_logs(log_level)
1✔
113
        self.error_path = self.pipeline.get_error_output_path()
1✔
114

115
        check_email = np.sum([x is not None for x in [email_recipients, email_sender]])
1✔
116
        if np.sum(check_email) == 1:
1✔
117
            err = (
118
                "In order to send emails, you must specify both a sender"
119
                f" and a recipient. \n In this case, sender is {email_sender} "
120
                f"and recipient is {email_recipients}."
121
            )
122
            logger.error(err)
123
            raise ValueError(err)
124

125
        self.final_postprocess_hours = float(final_postprocess_hours) * u.hour
1✔
126
        logger.info(f"Will terminate after {final_postprocess_hours} hours.")
1✔
127
        self.t_start = Time.now()
1✔
128

129
        self.midway_postprocess_hours = float(midway_postprocess_hours) * u.hour
1✔
130

131
        if self.midway_postprocess_hours > self.final_postprocess_hours:
1✔
132
            logger.warning(
×
133
                f"Midway postprocessing was set to {self.midway_postprocess_hours}, "
134
                "but the monitor has a shorter termination period of "
135
                f"{self.final_postprocess_hours}. Setting to to 95% of max wait."
136
            )
137
            self.midway_postprocess_hours = 0.95 * self.final_postprocess_hours
×
138

139
        if np.sum(check_email) == 2:
1✔
140
            logger.info(
1✔
141
                f"Will send an email summary after "
142
                f"{self.midway_postprocess_hours} hours."
143
            )
144
            self.email_info = (email_sender, email_recipients)
1✔
145
            self.email_to_send = True
1✔
146

147
        else:
148
            logger.info("No email notification configured.")
×
149
            self.email_info = None
×
150
            self.email_to_send = False
×
151

152
        self.midway_postprocess_complete = False
1✔
153
        self.latest_csv_log = None
1✔
154

155
        self.processed_science_images = []
1✔
156
        self.corrupted_images = []
1✔
157

158
        # default to "pipeline default cal requirements"
159

160
        if cal_requirements is None:
1✔
161
            cal_requirements = self.pipeline.default_cal_requirements
×
162

163
        if cal_requirements is not None:
1✔
164
            self.cal_images = find_required_cals(
1✔
165
                latest_dir=str(self.raw_image_directory),
166
                night=night,
167
                open_f=self.pipeline.load_raw_image,
168
                requirements=cal_requirements,
169
            )
170
        else:
171
            self.cal_images = ImageBatch()
×
172

173
    def summarise_errors(
1✔
174
        self,
175
        errorstack: ErrorStack,
176
    ):
177
        """Create a text summary using an errorstack and the list
178
        of processed images. Sends an email of this if configured
179
        to do so, or prints otherwise.
180

181
        :param errorstack: list of errors to summarise
182
        :return: None
183
        """
184

185
        error_summary = errorstack.summarise_error_stack(verbose=False)
1✔
186
        summary = (
1✔
187
            f"Processed a total of {len(self.processed_science_images)}"
188
            f" science images. \n\n {error_summary} \n"
189
        )
190

191
        logger.info(f"Writing error log to {self.error_path}")
1✔
192
        errorstack.summarise_error_stack(verbose=True, output_path=self.error_path)
1✔
193

194
        if self.email_info is not None:
1✔
195

196
            sender, recipients = self.email_info
1✔
197

198
            subject = f"{self.pipeline_name}: Summary for night {self.night}"
1✔
199

200
            attachments = [self.log_path, self.error_path]
1✔
201

202
            # Send the latest CSV log if there is one
203
            if self.latest_csv_log is not None:
1✔
204
                attachments.append(self.latest_csv_log)
×
205

206
            send_gmail(
1✔
207
                email_sender=sender,
208
                email_recipients=recipients,
209
                email_subject=subject,
210
                email_text=summary,
211
                attachments=attachments,
212
            )
213
        else:
214
            print(summary)
×
215

216
    def configure_logs(self, log_level="INFO"):
1✔
217
        """Function to configure the log level for the python logger.
218
        Posts the log to the terminal and also writes it to a file.
219

220
        :param log_level: python log level
221
        :return: lof file path
222
        """
223

224
        log_output_path = get_output_path(
1✔
225
            base_name=f"{self.night}_processing_log.txt",
226
            dir_root=self.pipeline.night_sub_dir,
227
        )
228

229
        try:
1✔
230
            os.makedirs(os.path.dirname(log_output_path))
1✔
231
        except OSError:
×
232
            pass
×
233

234
        log = logging.getLogger("winterdrp")
1✔
235

236
        handler = logging.FileHandler(log_output_path)
1✔
237
        # handler = logging.StreamHandler(sys.stdout)
238
        formatter = logging.Formatter(
1✔
239
            "%(asctime)s: %(name)s [l %(lineno)d] - %(levelname)s - %(message)s"
240
        )
241
        handler.setFormatter(formatter)
1✔
242
        log.addHandler(handler)
1✔
243
        log.setLevel(log_level)
1✔
244

245
        root = logging.getLogger()
1✔
246
        root.setLevel(log_level)
1✔
247

248
        handler = logging.StreamHandler(sys.stdout)
1✔
249
        handler.setLevel(log_level)
1✔
250
        formatter = logging.Formatter(
1✔
251
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
252
        )
253
        handler.setFormatter(formatter)
1✔
254
        root.addHandler(handler)
1✔
255

256
        logger.info(f"Logging level: {self.log_level}, saving log to {log_output_path}")
1✔
257
        return log_output_path
1✔
258

259
    def process_realtime(self):
1✔
260
        """Function to initiate the actual monitoring.
261

262
        :return: None
263
        """
264
        # create queue
265
        monitor_queue = Queue()
1✔
266

267
        workers = []
1✔
268

269
        n_cpu = max_n_cpu
1✔
270

271
        for _ in range(n_cpu):
1✔
272
            # Set up a worker thread to process database load
273
            worker = Thread(target=self.process_load_queue, args=(monitor_queue,))
1✔
274
            worker.daemon = True
1✔
275
            worker.start()
1✔
276

277
            workers.append(worker)
1✔
278

279
        # setup watchdog to monitor directory for trigger files
280
        logger.info(f"Watching {self.raw_image_directory}")
1✔
281

282
        event_handler = NewImageHandler(monitor_queue)
1✔
283
        observer = Observer()
1✔
284
        observer.schedule(event_handler, path=str(self.raw_image_directory))
1✔
285
        observer.start()
1✔
286

287
        try:
1✔
288
            while (Time.now() - self.t_start) < self.final_postprocess_hours:
1✔
289
                time.sleep(2)
1✔
290
        finally:
291
            logger.info("No longer waiting for new images.")
1✔
292
            observer.stop()
1✔
293
            observer.join()
1✔
294
            self.postprocess()
1✔
295

296
    def update_error_log(self):
1✔
297
        """Function to overwrite the error file with the latest version.
298
        The error summary is cumulative, so this just updates the file.
299
        """
300
        self.errorstack.summarise_error_stack(verbose=True, output_path=self.error_path)
1✔
301

302
    def postprocess(self):
1✔
303
        """Function to be run after some realtime postprocessing has been run.
304
        This function is called once after a configurable number of hours
305
        (typically when the data is expected to be done), and then again
306
        when the monitor stops watching the directory.
307

308
        :return: None
309
        """
310
        self.update_error_log()
1✔
311

312
        logger.info("Running postprocess steps")
1✔
313

314
        if self.postprocess_configurations is not None:
1✔
315

316
            postprocess_config = [
1✔
317
                ImageLoader(
318
                    load_image=self.pipeline.unpack_raw_image,
319
                    input_sub_dir=self.sub_dir,
320
                    input_img_dir=str(Path(self.raw_image_directory)).split(
321
                        self.pipeline_name, maxsplit=1
322
                    )[0],
323
                )
324
            ]
325

326
            postprocess_config += self.pipeline.postprocess_configuration(
1✔
327
                errorstack=self.errorstack,
328
                processed_images=[
329
                    os.path.basename(x) for x in self.processed_science_images
330
                ],
331
                selected_configurations=self.postprocess_configurations,
332
            )
333

334
            protected_key = "_monitor"
1✔
335
            while protected_key in self.pipeline.all_pipeline_configurations.keys():
1✔
336
                protected_key += "_2"
1✔
337

338
            self.pipeline.add_configuration(protected_key, postprocess_config)
1✔
339
            self.pipeline.set_configuration(protected_key)
1✔
340

341
            for processor in self.pipeline.all_pipeline_configurations[protected_key]:
1✔
342
                if isinstance(processor, CSVLog):
1✔
343
                    self.latest_csv_log = processor.get_output_path()
×
344

345
            _, errorstack = self.pipeline.reduce_images(
1✔
346
                dataset=Dataset(ImageBatch()),
347
                selected_configurations=protected_key,
348
                catch_all_errors=True,
349
            )
350
            self.errorstack += errorstack
1✔
351
            self.update_error_log()
1✔
352

353
    def process_load_queue(self, queue: Queue):
1✔
354
        """This is the worker thread function. It is run as a daemon
355
        threads that only exit when the main thread ends.
356

357
        Args
358
        ==========
359
          queue:  Queue() object
360
        """
361
        while True:
1✔
362

363
            if Time.now() - self.t_start > self.midway_postprocess_hours:
1✔
364
                if not self.midway_postprocess_complete:
1✔
365
                    self.midway_postprocess_complete = True
1✔
366
                    logger.info("Postprocess time!")
1✔
367
                    self.postprocess()
1✔
368
                    if self.email_to_send:
1✔
369
                        logger.info(
1✔
370
                            f"More than {self.midway_postprocess_hours} "
371
                            f"hours have elapsed. Sending summary email."
372
                        )
373
                        self.summarise_errors(errorstack=self.errorstack)
1✔
374

375
            if not queue.empty():
1✔
376
                event = queue.get()
1✔
377

378
                if event.src_path[-5:] == ".fits":
1✔
379

380
                    # Verify that file transfer is complete, useful for rsync latency
381

382
                    # Disclaimer: I (Robert) do not feel great about having written
383
                    # this code block.
384
                    # It seems to works though, let's hope no one finds out!
385
                    # I will cover my tracks by hiding the astropy warning which
386
                    # inspired this block, informing the user that the file
387
                    # is not as long as expected
388

389
                    check = False
1✔
390

391
                    while not check:
1✔
392
                        with catch_warnings():
1✔
393
                            warnings.filterwarnings(
1✔
394
                                "ignore", category=AstropyUserWarning
395
                            )
396
                            try:
1✔
397
                                with fits.open(event.src_path) as hdul:
1✔
398
                                    check = hdul._file.tell() == hdul._file.size
1✔
399
                            except OSError:
×
400
                                pass
×
401

402
                            if not check:
1✔
403
                                print(
×
404
                                    "Seems like the file is not fully transferred. "
405
                                    "Waiting a couple of seconds before trying again."
406
                                )
407
                                time.sleep(3)
×
408

409
                    try:
1✔
410

411
                        img = self.pipeline.load_raw_image(event.src_path)
1✔
412

413
                        is_science = img["OBSCLASS"] == "science"
1✔
414

415
                        all_img = ImageBatch(img) + copy.deepcopy(self.cal_images)
1✔
416

417
                        if not is_science:
1✔
418
                            print(f"Skipping {event.src_path} (calibration image)")
×
419
                        else:
420
                            print(
1✔
421
                                f"Reducing {event.src_path} (science image) "
422
                                f"on thread {threading.get_ident()}"
423
                            )
424
                            _, errorstack = self.pipeline.reduce_images(
1✔
425
                                dataset=Dataset(all_img),
426
                                selected_configurations=self.realtime_configurations,
427
                                catch_all_errors=True,
428
                            )
429
                            self.processed_science_images.append(event.src_path)
1✔
430
                            self.errorstack += errorstack
1✔
431
                            self.update_error_log()
1✔
432

433
                    # RS: Please forgive me for this coding sin
434
                    # I just want the monitor to never crash
435
                    except Exception as exc:  # pylint: disable=broad-except
×
436
                        err_report = ErrorReport(
×
437
                            exc, "monitor", contents=[event.src_path]
438
                        )
439
                        self.errorstack.add_report(err_report)
×
440
                        self.update_error_log()
×
441

442
            else:
443
                time.sleep(1)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc