• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

angelolab / toffy / 11961352837

21 Nov 2024 08:44PM UTC coverage: 91.952% (-0.2%) from 92.104%
11961352837

Pull #467

github

web-flow
Merge 25e0126a9 into 5f16d6fa0
Pull Request #467: Enforce `exact_match=True` when listing JSON file for `get_estimated_time` for MPH

1421 of 1549 branches covered (91.74%)

Branch coverage included in aggregate %.

8 of 8 new or added lines in 4 files covered. (100.0%)

6 existing lines in 1 file now uncovered.

2715 of 2949 relevant lines covered (92.07%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.82
/src/toffy/fov_watcher.py
1
import logging
1✔
2
import os
1✔
3
import threading
1✔
4
import time
1✔
5
import warnings
1✔
6
from datetime import datetime
1✔
7
from multiprocessing import Lock
1✔
8
from pathlib import Path
1✔
9
from typing import Callable, Tuple, Union
1✔
10

11
import natsort as ns
1✔
12
import numpy as np
1✔
13
from matplotlib import pyplot as plt
1✔
14
from watchdog.events import (
1✔
15
    DirCreatedEvent,
16
    FileCreatedEvent,
17
    FileMovedEvent,
18
    FileSystemEventHandler,
19
)
20
from watchdog.observers import Observer
1✔
21

22
from toffy.json_utils import read_json_file
1✔
23

24

25
class RunStructure:
1✔
26
    """Expected bin and json files.
1✔
27

28
    Attributes:
29
        fov_progress (dict): Whether or not an expected file has been created
30
    """
31

32
    def __init__(self, run_folder: str, fov_timeout: int = 7800):
1✔
33
        """Initializes RunStructure by parsing run json within provided run folder.
34

35
        Args:
36
            run_folder (str):
37
                path to run folder
38
            fov_timeout (int):
39
                number of seconds to wait for non-null filesize before raising an error
40
        """
41
        self.timeout = fov_timeout
1✔
42
        self.fov_progress = {}
1✔
43
        self.processed_fovs = []
1✔
44
        self.moly_points = []
1✔
45

46
        # find run .json and get parameters
47
        run_name = Path(run_folder).parts[-1]
1✔
48
        run_metadata = read_json_file(
1✔
49
            os.path.join(run_folder, f"{run_name}.json"), encoding="utf-8"
50
        )
51

52
        # parse run_metadata and populate expected structure
53
        for fov in run_metadata.get("fovs", ()):
1✔
54
            run_order = fov.get("runOrder", -1)
1✔
55
            scan = fov.get("scanCount", -1)
1✔
56
            if run_order * scan < 0:
1✔
57
                raise KeyError(f"Could not locate keys in {run_folder}.json")
×
58

59
            # scan 2's don't contain significant imaging data per new MIBI specs
60
            fov_name = f"fov-{run_order}-scan-1"
1✔
61
            if fov.get("standardTarget", "") == "Molybdenum Foil":
1✔
62
                self.moly_points.append(fov_name)
1✔
63

64
            self.fov_progress[fov_name] = {"json": False, "bin": False}
1✔
65

66
        # get the highest FOV number, needed for checking if final FOV processed
67
        # NOTE: only scan-1 files considered, so len is good
68
        self.highest_fov = len(self.fov_progress)
1✔
69

70
    def check_run_condition(self, path: str) -> Tuple[bool, str]:
1✔
71
        """Checks if all requisite files exist and are complete.
72

73
        Args:
74
            path (str):
75
                path to expected file
76

77
        Raises:
78
            TimeoutError
79

80
        Returns:
81
            (bool, str):
82
                whether or not both json and bin files exist, as well as the name of the point
83
        """
84
        filename = Path(path).parts[-1]
1✔
85

86
        # if filename starts with a '.' (temp file), it should be ignored
87
        if filename[0] == ".":
1✔
88
            return False, ""
1✔
89

90
        # filename is not corrct format of fov.bin or fov.json
91
        if len(filename.split(".")) != 2:
1✔
92
            return False, ""
1✔
93

94
        fov_name, extension = filename.split(".")
1✔
95

96
        # path no longer valid
97
        if not os.path.exists(path):
1✔
98
            warnings.warn(
1✔
99
                f"{path} doesn't exist but was recently created. This should be unreachable...",
100
                Warning,
101
            )
102
            return False, ""
1✔
103

104
        # avoids repeated processing in case of duplicated events
105
        if fov_name in self.processed_fovs:
1✔
106
            return False, fov_name
1✔
107

108
        # does not process moly points, but add to process list to ensure proper incrementing
109
        if fov_name in self.moly_points:
1✔
110
            self.processed(fov_name)
1✔
111
            self.fov_progress[fov_name]["json"] = True
1✔
112
            self.fov_progress[fov_name]["bin"] = True
1✔
113
            return False, fov_name
1✔
114

115
        wait_time = 0
1✔
116
        if fov_name in self.fov_progress:
1✔
117
            if extension in self.fov_progress[fov_name]:
1✔
118
                while os.path.getsize(path) == 0:
1✔
119
                    # consider timed out fovs complete
120
                    if wait_time >= self.timeout:
1✔
121
                        del self.fov_progress[fov_name]
1✔
122
                        raise TimeoutError(f"timed out waiting for {path}...")
1✔
123

124
                    time.sleep(self.timeout / 10)
1✔
125
                    wait_time += self.timeout / 10
1✔
126

127
                self.fov_progress[fov_name][extension] = True
1✔
128

129
            if all(self.fov_progress[fov_name].values()):
1!
130
                return True, fov_name
1✔
131

132
        elif extension == "bin":
1✔
133
            warnings.warn(f"Found unexpected bin file, {path}...", Warning)
1✔
134
            return False, ""
1✔
135

136
        return False, fov_name
1✔
137

138
    def processed(self, fov_name: str):
1✔
139
        """Notifies run structure that fov has been processed.
140

141
        Args:
142
            fov_name (str):
143
                Name of FoV
144
        """
145
        self.processed_fovs.append(fov_name)
1✔
146

147
    def check_fov_progress(self) -> dict:
1✔
148
        """Condenses internal dictionary to show which fovs have finished.
149

150
        Returns:
151
            dict
152
        """
153
        all_fovs = self.fov_progress.keys()
1✔
154
        moly_fovs = self.moly_points
1✔
155
        necessary_fovs = list(set(all_fovs).difference(moly_fovs))
1✔
156

157
        return {k: all(self.fov_progress[k].values()) for k in necessary_fovs}
1✔
158

159

160
class FOV_EventHandler(FileSystemEventHandler):
1✔
161
    """File event handler for FOV files.
1✔
162

163
    Attributes:
164
        run_folder (str):
165
            path to run folder
166
        watcher_out (str):
167
            folder to save all callback results + log file
168
        run_structure (RunStructure):
169
            expected run file structure + fov_progress status
170
        fov_callback (Callable[[str, str], None]):
171
            callback to run on each fov
172
        run_callback (Callable[[None], None]):
173
            callback to run over the entire run
174
    """
175

176
    instance_count = 0
1✔
177

178
    def __init__(
1✔
179
        self,
180
        run_folder: str,
181
        log_folder: str,
182
        fov_callback: Callable[[str, str], None],
183
        run_callback: Callable[[str], None],
184
        intermediate_callback: Callable[[str], None] = None,
185
        fov_timeout: int = 7800,
186
        watcher_timeout: int = 3 * 7800,
187
    ):
188
        """Initializes FOV_EventHandler.
189

190
        Args:
191
            run_folder (str):
192
                path to run folder
193
            log_folder (str):
194
                path to save outputs to
195
            fov_callback (Callable[[str, str], None]):
196
                callback to run on each fov
197
            run_callback (Callable[[None], None]):
198
                callback to run over the entire run
199
            intermediate_callback (Callable[[None], None]):
200
                run callback overriden to run on each fov
201
            fov_timeout (int):
202
                number of seconds to wait for non-null filesize before raising an error
203
            watcher_timeout (int):
204
                length to wait for new file generation before timing out
205
        """
206
        super().__init__()
1✔
207
        self.run_folder = run_folder
1✔
208

209
        self.last_event_time = datetime.now()
1✔
210
        self.timer_thread = threading.Thread(
1✔
211
            target=self.file_timer, args=(fov_timeout, watcher_timeout)
212
        )
213
        self.timer_thread.daemon = True
1✔
214
        self.timer_thread.start()
1✔
215

216
        self.log_path = os.path.join(log_folder, f"{Path(run_folder).parts[-1]}_log.txt")
1✔
217
        if not os.path.exists(log_folder):
1✔
218
            os.makedirs(log_folder)
1✔
219
        logging.basicConfig(
1!
220
            level=logging.INFO,
221
            filename=self.log_path,
222
            filemode="a",
223
            format="%(name)s - %(levelname)s - %(message)s",
224
        )
225

226
        # create run structure
227
        self.run_structure = RunStructure(run_folder, fov_timeout=fov_timeout)
1✔
228

229
        self.fov_func = fov_callback
1✔
230
        self.run_func = run_callback
1✔
231
        self.inter_func = intermediate_callback
1✔
232
        self.inter_return_vals = None
1✔
233
        self.lock = Lock()
1✔
234
        self.last_fov_num_processed = 0
1✔
235
        self.all_fovs_complete = False
1✔
236

237
        for root, dirs, files in os.walk(run_folder):
1✔
238
            for name in ns.natsorted(files):
1✔
239
                # NOTE: don't call with check_last_fov to prevent duplicate processing
240
                self.on_created(FileCreatedEvent(os.path.join(root, name)), check_last_fov=False)
1✔
241

242
        # edge case if the last FOV gets written during the preprocessing stage
243
        # simulate a trigger using the first FOV file
244
        self._check_last_fov(
1✔
245
            os.path.join(root, list(self.run_structure.fov_progress.keys())[0] + ".bin")
246
        )
247

248
    def _check_fov_status(self, path: str):
1✔
249
        """Verifies the status of the file written at `path`.
250

251
        Args:
252
            path (str):
253
                The path to check the status of
254
        Returns:
255
            Tuple[Optional[str], Optional[str]]:
256
                The status of `path`, as well as the corresponding FOV name
257
        """
258
        try:
1✔
259
            fov_ready, point_name = self.run_structure.check_run_condition(path)
1✔
260
            return fov_ready, point_name
1✔
261
        except TimeoutError as timeout_error:
1✔
262
            print(f"Encountered TimeoutError error: {timeout_error}")
1✔
263
            logging.warning(
1✔
264
                f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- '
265
                f"{path} never reached non-zero file size...\n"
266
            )
267

268
            # these count as processed FOVs, so increment
269
            self.last_fov_num_processed += 1
1✔
270

271
            # these count as processed FOVs, so mark as processed
272
            self.run_structure.processed(Path(path).parts[-1].split(".")[0])
1✔
273
            self.check_complete()
1✔
274

275
            return None, None
1✔
276

277
    def _generate_callback_data(self, point_name: str, overwrite: bool):
1✔
278
        """Runs the `fov_func` and `inter_func` if applicable for a FOV.
279

280
        Args:
281
            point_name (str):
282
                The name of the FOV to run FOV (and intermediate if applicable) callbacks on
283
            overwrite (bool):
284
                Forces an overwrite of already existing data, needed if a FOV needs re-extraction
285
        """
286
        print(f"Discovered {point_name}, beginning per-fov callbacks...")
1✔
287
        logging.info(f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- Extracting {point_name}\n')
1✔
288
        logging.info(
1✔
289
            f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- '
290
            f"Running {self.fov_func.__name__} on {point_name}\n"
291
        )
292

293
        self.fov_func(self.run_folder, point_name, overwrite)
1✔
294
        self.run_structure.processed(point_name)
1✔
295

296
        if self.inter_func:
1✔
297
            # clear plots contained in intermediate return values if set
298
            if self.inter_return_vals:
1✔
299
                qc_plots = self.inter_return_vals.get("plot_qc_metrics", None)
1✔
300
                mph_plot = self.inter_return_vals.get("plot_mph_metrics", None)
1✔
301

302
                if qc_plots or mph_plot:
1✔
303
                    plt.cla()
1✔
304
                    plt.clf()
1✔
305
                    plt.close("all")
1✔
306

307
            self.inter_return_vals = self.inter_func(self.run_folder)
1!
308

309
        self.check_complete()
1✔
310

311
    def _process_missed_fovs(self, path: str):
1✔
312
        """Given a `path`, check if there are any missing FOVs to process before it.
313

314
        Args:
315
            path (str):
316
                The path to check for missing FOVs prior
317
        """
318
        # verify the path provided is correct .bin type, if not skip
319
        filename = Path(path).parts[-1]
1✔
320
        name_ext = filename.split(".")
1✔
321
        if len(name_ext) != 2 or name_ext[1] != "bin":
1✔
322
            return
1✔
323

324
        # NOTE: MIBI now only stores relevant data in scan 1, ignore any scans > 1
325
        scan_num = int(name_ext[0].split("-")[3])
1✔
326
        if scan_num > 1:
1✔
327
            return
×
328

329
        # retrieve the FOV number
330
        fov_num = int(name_ext[0].split("-")[1])
1✔
331

332
        # a difference of 1 from last_fov_num_processed means there are no in-between FOVs
333
        # set to <= for safety (this should never happen in theory)
334
        if fov_num - 1 <= self.last_fov_num_processed:
1✔
335
            return
1✔
336

337
        # NOTE: from observation, only the most recent FOV will ever be in danger of timing out
338
        # so all the FOVs processed in this function should already be fully processed
339
        start_index = self.last_fov_num_processed + 1 if self.last_fov_num_processed else 1
1✔
340
        for i in np.arange(start_index, fov_num):
1✔
341
            fov_name = f"fov-{i}-scan-1"
1✔
342
            fov_bin_file = os.path.join(self.run_folder, fov_name + ".bin")
1✔
343
            fov_json_file = os.path.join(self.run_folder, fov_name + ".json")
1✔
344

345
            # this can happen if there's a lag copying files over
346
            while not os.path.exists(fov_bin_file) and not os.path.exists(fov_json_file):
1✔
347
                time.sleep(60)
×
348

349
            self._fov_callback_driver(os.path.join(self.run_folder, fov_name + ".bin"))
1✔
350
            self._fov_callback_driver(os.path.join(self.run_folder, fov_name + ".json"))
1✔
351

352
    def _check_last_fov(self, path: str):
1✔
353
        """Checks if the last FOV's data has been written.
354

355
        Needed because there won't be any more file triggers after this happens.
356

357
        Args:
358
            path (str):
359
                The path that triggers this call. Used only for formatting purposes.
360
        """
361
        # define the name of the last FOV
362
        last_fov = f"fov-{self.run_structure.highest_fov}-scan-1"
1✔
363
        last_fov_bin = f"{last_fov}.bin"
1✔
364
        last_fov_json = f"{last_fov}.json"
1✔
365

366
        # if the last FOV has been written, then process everything up to that if necessary
367
        # NOTE: don't process if it has already been written
368
        bin_dir = str(Path(path).parents[0])
1✔
369
        last_fov_is_processed = self.last_fov_num_processed == self.run_structure.highest_fov
1✔
370
        last_fov_data_exists = os.path.exists(
1✔
371
            os.path.join(bin_dir, last_fov_bin)
372
        ) and os.path.exists(os.path.join(bin_dir, last_fov_json))
373

374
        if not last_fov_is_processed and last_fov_data_exists:
1✔
375
            start_index = self.last_fov_num_processed + 1 if self.last_fov_num_processed else 1
1✔
376
            for i in np.arange(start_index, self.run_structure.highest_fov):
1✔
UNCOV
377
                fov_name = f"fov-{i}-scan-1"
×
UNCOV
378
                fov_bin_file = os.path.join(self.run_folder, fov_name + ".bin")
×
UNCOV
379
                fov_json_file = os.path.join(self.run_folder, fov_name + ".json")
×
380

381
                # this can happen if there's a lag copying files over
UNCOV
382
                while not os.path.exists(fov_bin_file) and not os.path.exists(fov_json_file):
×
383
                    time.sleep(60)
×
384

UNCOV
385
                self._fov_callback_driver(os.path.join(self.run_folder, fov_name + ".bin"))
×
UNCOV
386
                self._fov_callback_driver(os.path.join(self.run_folder, fov_name + ".json"))
×
387

388
            # process the final bin file
389
            self._fov_callback_driver(os.path.join(self.run_folder, last_fov_bin))
1✔
390
            self._fov_callback_driver(os.path.join(self.run_folder, last_fov_json))
1✔
391

392
            # explicitly call check_complete to start run callbacks, since all FOVs are done
393
            self.check_complete()
1✔
394

395
    def _check_bin_updates(self):
1✔
396
        """Checks for, and re-runs if necessary, any incompletely extracted FOVs."""
397
        for fov in self.run_structure.fov_progress:
1✔
398
            # skip moly points
399
            if fov in self.run_structure.moly_points:
1✔
400
                continue
1✔
401

402
            fov_bin_path = os.path.join(self.run_folder, fov + ".bin")
1✔
403
            fov_json_path = os.path.join(self.run_folder, fov + ".json")
1✔
404

405
            # if .bin file ctime > .json file ctime, incomplete extraction, need to re-extract
406
            fov_bin_create = Path(fov_bin_path).stat().st_ctime
1✔
407
            fov_json_create = Path(fov_json_path).stat().st_ctime
1✔
408

409
            if fov_bin_create > fov_json_create:
1✔
410
                warnings.warn(f"Re-extracting incompletely extracted FOV {fov}")
1✔
411
                logging.info(
1✔
412
                    f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- Re-extracting {fov}\n'
413
                )
414
                logging.info(
1✔
415
                    f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- '
416
                    f"Running {self.fov_func.__name__} on {fov}\n"
417
                )
418

419
                # since reprocessing needed, remove from self.processed_fovs
420
                self.run_structure.processed_fovs.remove(fov)
1✔
421

422
                # re-extract the .bin file
423
                # NOTE: since no more FOVs are being written, last_fov_num_processed is irrelevant
424
                self._fov_callback_driver(fov_bin_path, overwrite=True)
1✔
425

426
    def _fov_callback_driver(self, file_trigger: str, overwrite: bool = False):
1✔
427
        """The FOV and intermediate-level callback motherbase for a single .bin file.
428

429
        Args:
430
            file_trigger (str):
431
                The file that gets caught by the watcher to throw into the pipeline
432
            overwrite (bool):
433
                Forces an overwrite of already existing data, needed if a FOV needs re-extraction
434
        """
435
        # check if what's created is in the run structure
436
        fov_ready, point_name = self._check_fov_status(file_trigger)
1✔
437

438
        if fov_ready:
1✔
439
            self._generate_callback_data(point_name, overwrite=overwrite)
1✔
440

441
        # needs to update if .bin file processed OR new moly point detected
442
        is_moly = point_name in self.run_structure.moly_points
1✔
443
        is_processed = point_name in self.run_structure.processed_fovs
1✔
444
        if fov_ready or (is_moly and not is_processed):
1✔
445
            self.last_fov_num_processed += 1
1✔
446

447
    def _run_callbacks(
1✔
448
        self, event: Union[DirCreatedEvent, FileCreatedEvent, FileMovedEvent], check_last_fov: bool
449
    ):
450
        """The pipeline runner, invoked when a new event is seen.
451

452
        Args:
453
            event (Union[DirCreatedEvent, FileCreatedEvent, FileMovedEvent]):
454
                The type of event seen. File/directory creation and file renaming are supported.
455
            check_last_fov (bool):
456
                Whether to invoke `_check_last_fov` on the event
457
        """
458
        if type(event) in [DirCreatedEvent, FileCreatedEvent]:
1✔
459
            file_trigger = event.src_path
1✔
460
        else:
461
            file_trigger = event.dest_path
1✔
462

463
        # process any FOVs that got missed on the previous iteration of on_created/on_moved
464
        self._process_missed_fovs(file_trigger)
1✔
465

466
        # run the fov callback process on the file
467
        self._fov_callback_driver(file_trigger)
1✔
468

469
        if check_last_fov:
1✔
470
            self._check_last_fov(file_trigger)
1✔
471

472
    def on_created(self, event: FileCreatedEvent, check_last_fov: bool = True):
1✔
473
        """Handles file creation events.
474

475
        If FOV structure is completed, the fov callback, `self.fov_func` will be run over the data.
476
        This function is automatically called; users generally shouldn't call this function
477

478
        Args:
479
            event (FileCreatedEvent): file creation event
480
            check_last_fov (bool): whether to check if the last fov has been processed
481
        """
482
        # reset event creation time
483
        current_time = datetime.now()
1✔
484
        self.last_event_time = current_time
1✔
485

486
        # this happens if _check_last_fov gets called by a prior FOV, no need to reprocess
487
        if self.last_fov_num_processed == self.run_structure.highest_fov:
1✔
488
            return
×
489

490
        with self.lock:
1✔
491
            super().on_created(event)
1✔
492
            self._run_callbacks(event, check_last_fov)
1✔
493

494
    def file_timer(self, fov_timeout, watcher_timeout):
1✔
495
        """Checks time since last file was generated.
496

497
        Args:
498
            fov_timeout (int):
499
                how long to wait for fov data to be generated once file detected
500
            watcher_timeout (int):
501
                length to wait for new file generation before timing out
502
        """
503
        while True:
1✔
504
            current_time = datetime.now()
1✔
505
            time_elapsed = (current_time - self.last_event_time).total_seconds()
1✔
506

507
            # 3 fov cycles and no new files --> timeout
508
            if time_elapsed > watcher_timeout:
1✔
509
                fov_num = self.last_fov_num_processed
1✔
510
                fov_name = list(self.run_structure.fov_progress.keys())[fov_num]
1✔
511
                print(f"Timed out waiting for {fov_name} files to be generated.")
1✔
512
                logging.info(
1✔
513
                    f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- Timed out'
514
                    f"waiting for {fov_name} files to be generated.\n"
515
                )
516
                logging.info(
1✔
517
                    f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- '
518
                    f"Running {self.run_func.__name__} on FOVs\n"
519
                )
520

521
                # mark remaining fovs as completed to exit watcher
522
                for fov_name in list(self.run_structure.fov_progress.keys()):
1✔
523
                    self.run_structure.fov_progress[fov_name] = {"json": True, "bin": True}
1✔
524

525
                # trigger run callbacks
526
                self.run_func(self.run_folder)
1✔
527
                break
1✔
528
            time.sleep(fov_timeout)
1✔
529

530
    def on_moved(self, event: FileMovedEvent, check_last_fov: bool = True):
1✔
531
        """Handles file renaming events.
532

533
        If FOV structure is completed, the fov callback, `self.fov_func` will be run over the data.
534
        This function is automatically called; users generally shouldn't call this function
535

536
        Args:
537
            event (FileMovedEvent): file moved event
538
            check_last_fov (bool): whether to check if last fov was processed
539
        """
540
        # this happens if _check_last_fov gets called by a prior FOV, no need to reprocess
541
        if self.last_fov_num_processed == self.run_structure.highest_fov:
1✔
542
            return
×
543

544
        with self.lock:
1✔
545
            super().on_moved(event)
1✔
546
            self._run_callbacks(event, check_last_fov)
1✔
547

548
    def check_complete(self):
1✔
549
        """Checks run structure fov_progress status.
550

551
        If run is complete, all callbacks in `per_run` will be run over the whole run.
552

553
        NOTE: bin files that had new data written will first need to be re-extracted.
554
        """
555
        if all(self.run_structure.check_fov_progress().values()) and not self.all_fovs_complete:
1✔
556
            self.all_fovs_complete = True
1✔
557
            self._check_bin_updates()
1✔
558
            logging.info(f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- All FOVs finished\n')
1✔
559
            logging.info(
1✔
560
                f'{datetime.now().strftime("%d/%m/%Y %H:%M:%S")} -- '
561
                f"Running {self.run_func.__name__} on whole run\n"
562
            )
563

564
            self.run_func(self.run_folder)
1✔
565

566

567
def start_watcher(
1✔
568
    run_folder: str,
569
    log_folder: str,
570
    fov_callback: Callable[[str, str], None],
571
    run_callback: Callable[[None], None],
572
    intermediate_callback: Callable[[str, str], None] = None,
573
    run_folder_timeout: int = 5400,
574
    completion_check_time: int = 30,
575
    zero_size_timeout: int = 7800,
576
    watcher_timeout: int = 3 * 7800,
577
):
578
    """Passes bin files to provided callback functions as they're created.
579

580
    Args:
581
        run_folder (str):
582
            path to run folder
583
        log_folder (str):
584
            where to create log file
585
        fov_callback (Callable[[str, str], None]):
586
            function to run on each completed fov. assemble this using
587
            `watcher_callbacks.build_callbacks`
588
        run_callback (Callable[[None], None]):
589
            function ran once the run has completed. assemble this using
590
            `watcher_callbacks.build_callbacks`
591
        intermediate_callback (Callable[[None], None]):
592
            function defined as run callback overriden as fov callback. assemble this using
593
            `watcher_callbacks.build_callbacks`
594
        run_folder_timeout (int):
595
            how long to wait for the run folder to appear before timing out, in seconds.
596
            note that the watcher cannot begin until this run folder appears.
597
        completion_check_time (int):
598
            how long to wait before checking watcher completion, in seconds.
599
            note, this doesn't effect the watcher itself, just when this wrapper function exits.
600
        zero_size_timeout (int):
601
            number of seconds to wait for non-zero file size
602
        watcher_timeout (int):
603
            number of seconds to wait for new fov file generation
604
    """
605
    # if the run folder specified isn't already there, ask the user to explicitly confirm the name
606
    if not os.path.exists(run_folder):
1✔
607
        warnings.warn(
1✔
608
            f"Waiting for {run_folder}. Please first double check that your run data "
609
            "doesn't already exist under a slightly different name in D:\\Data. "
610
            "Sometimes, the CACs change capitalization or add extra characters to the run folder. "
611
            "If this happens, stop the watcher and update the run_name variable in the notebook "
612
            "before trying again."
613
        )
614

615
    # allow the watcher to poll the run folder until it appears or times out
616
    run_folder_wait_time = 0
1✔
617
    while not os.path.exists(run_folder) and run_folder_wait_time < run_folder_timeout:
1✔
618
        time.sleep(run_folder_timeout / 10)
1✔
619
        run_folder_wait_time += run_folder_timeout / 10
1✔
620

621
    if run_folder_wait_time == run_folder_timeout:
1✔
622
        raise FileNotFoundError(
1✔
623
            f"Timed out waiting for {run_folder}. Make sure the run_name variable in the notebook "
624
            "matches up with the run folder name in D:\\Data, or try again a few minutes later "
625
            "if the run folder still hasn't shown up."
626
        )
627

628
    observer = Observer()
1✔
629
    event_handler = FOV_EventHandler(
1✔
630
        run_folder,
631
        log_folder,
632
        fov_callback,
633
        run_callback,
634
        intermediate_callback,
635
        zero_size_timeout,
636
        watcher_timeout,
637
    )
638
    observer.schedule(event_handler, run_folder, recursive=True)
1✔
639
    observer.start()
1✔
640

641
    try:
1✔
642
        while not all(event_handler.run_structure.check_fov_progress().values()):
1✔
643
            time.sleep(completion_check_time)
1✔
644
    except KeyboardInterrupt:
×
645
        observer.stop()
×
646

647
    observer.stop()
1✔
648
    observer.join()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc