• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sgkit-dev / bio2zarr / 15017919138

14 May 2025 10:04AM UTC coverage: 98.166% (+0.01%) from 98.153%
15017919138

Pull #385

github

web-flow
Merge 4dd5b3863 into 587a29e79
Pull Request #385: WIP - Optional deps

28 of 28 new or added lines in 5 files covered. (100.0%)

2 existing lines in 2 files now uncovered.

2784 of 2836 relevant lines covered (98.17%)

3.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.56
/bio2zarr/core.py
1
import concurrent.futures as cf
4✔
2
import contextlib
4✔
3
import dataclasses
4✔
4
import json
4✔
5
import logging
4✔
6
import math
4✔
7
import multiprocessing
4✔
8
import os
4✔
9
import os.path
4✔
10
import threading
4✔
11
import importlib
4✔
12
import functools
4✔
13
import time
4✔
14

15
import humanfriendly
4✔
16
import numcodecs
4✔
17
import numpy as np
4✔
18
import tqdm
4✔
19
import zarr
4✔
20

21
logger = logging.getLogger(__name__)
4✔
22

23
numcodecs.blosc.use_threads = False
4✔
24

25

26
def requires_optional_dependency(module_name, extras_name):
4✔
27
    """Decorator to check for optional dependencies"""
28
    def decorator(func):
4✔
29
        @functools.wraps(func)
4✔
30
        def wrapper(*args, **kwargs):
4✔
31
            try:
4✔
32
                importlib.import_module(module_name)
4✔
33
            except ImportError:
4✔
34
                raise ImportError(
4✔
35
                    f"This process requires the optional {module_name} module. "
36
                    f"Install it with: pip install bio2zarr[{extras_name}]"
37
                )
38
            return func(*args, **kwargs)
4✔
39
        return wrapper
4✔
40
    return decorator
4✔
41

42

43
def display_number(x):
4✔
44
    ret = "n/a"
4✔
45
    if math.isfinite(x):
4✔
46
        ret = f"{x: 0.2g}"
4✔
47
    return ret
4✔
48

49

50
def display_size(n):
4✔
51
    return humanfriendly.format_size(n, binary=True)
4✔
52

53

54
def parse_max_memory(max_memory):
4✔
55
    if max_memory is None:
4✔
56
        # Effectively unbounded
57
        return 2**63
4✔
58
    if isinstance(max_memory, str):
4✔
59
        max_memory = humanfriendly.parse_size(max_memory)
4✔
60
    logger.info(f"Set memory budget to {display_size(max_memory)}")
4✔
61
    return max_memory
4✔
62

63

64
def min_int_dtype(min_value, max_value):
4✔
65
    if min_value > max_value:
4✔
66
        raise ValueError("min_value must be <= max_value")
4✔
67
    for a_dtype in ["i1", "i2", "i4", "i8"]:
4✔
68
        info = np.iinfo(a_dtype)
4✔
69
        if info.min <= min_value and max_value <= info.max:
4✔
70
            return a_dtype
4✔
71
    raise OverflowError("Integer cannot be represented")
4✔
72

73

74
def chunk_aligned_slices(z, n, max_chunks=None):
4✔
75
    """
76
    Returns at n slices in the specified zarr array, aligned
77
    with its chunks
78
    """
79
    chunk_size = z.chunks[0]
4✔
80
    num_chunks = int(np.ceil(z.shape[0] / chunk_size))
4✔
81
    if max_chunks is not None:
4✔
82
        num_chunks = min(num_chunks, max_chunks)
4✔
83
    slices = []
4✔
84
    splits = np.array_split(np.arange(num_chunks), min(n, num_chunks))
4✔
85
    for split in splits:
4✔
86
        start = split[0] * chunk_size
4✔
87
        stop = (split[-1] + 1) * chunk_size
4✔
88
        stop = min(stop, z.shape[0])
4✔
89
        slices.append((start, stop))
4✔
90
    return slices
4✔
91

92

93
def first_dim_slice_iter(z, start, stop):
4✔
94
    """
95
    Efficiently iterate over the specified slice of the first dimension of the zarr
96
    array z.
97
    """
98
    chunk_size = z.chunks[0]
4✔
99
    first_chunk = start // chunk_size
4✔
100
    last_chunk = (stop // chunk_size) + (stop % chunk_size != 0)
4✔
101
    for chunk in range(first_chunk, last_chunk):
4✔
102
        Z = z.blocks[chunk]
4✔
103
        chunk_start = chunk * chunk_size
4✔
104
        chunk_stop = chunk_start + chunk_size
4✔
105
        slice_start = None
4✔
106
        if start > chunk_start:
4✔
107
            slice_start = start - chunk_start
4✔
108
        slice_stop = None
4✔
109
        if stop < chunk_stop:
4✔
110
            slice_stop = stop - chunk_start
4✔
111
        yield from Z[slice_start:slice_stop]
4✔
112

113

114
def du(path):
4✔
115
    """
116
    Return the total bytes stored at this path.
117
    """
118
    total = os.path.getsize(path)
4✔
119
    # pathlib walk method doesn't exist until 3.12 :(
120
    for root, dirs, files in os.walk(path):
4✔
121
        for lst in [dirs, files]:
4✔
122
            for name in lst:
4✔
123
                fullname = os.path.join(root, name)
4✔
124
                size = os.path.getsize(fullname)
4✔
125
                total += size
4✔
126
    logger.debug(f"du({path}) = {total}")
4✔
127
    return total
4✔
128

129

130
class SynchronousExecutor(cf.Executor):
4✔
131
    # Arguably we should use workers=0 as the default and use this
132
    # executor implementation. However, the docs are fairly explicit
133
    # about saying we shouldn't instantiate Future objects directly,
134
    # so it's best to keep this as a semi-secret debugging interface
135
    # for now.
136
    def submit(self, fn, /, *args, **kwargs):
4✔
137
        future = cf.Future()
4✔
138
        future.set_result(fn(*args, **kwargs))
4✔
139
        return future
4✔
140

141

142
def wait_on_futures(futures):
4✔
143
    for future in cf.as_completed(futures):
4✔
144
        exception = future.exception()
4✔
145
        if exception is not None:
4✔
146
            cancel_futures(futures)
4✔
147
            if isinstance(exception, cf.process.BrokenProcessPool):
4✔
UNCOV
148
                raise RuntimeError(
×
149
                    "Worker process died: you may have run out of memory"
150
                ) from exception
151
            else:
152
                raise exception
4✔
153

154

155
def cancel_futures(futures):
4✔
156
    for future in futures:
4✔
157
        future.cancel()
4✔
158

159

160
@dataclasses.dataclass
4✔
161
class BufferedArray:
4✔
162
    array: zarr.Array
4✔
163
    array_offset: int
4✔
164
    name: str
4✔
165
    buff: np.ndarray
4✔
166
    buffer_row: int
4✔
167
    max_buff_size: int = 0
4✔
168

169
    def __init__(self, array, offset, name="Unknown"):
4✔
170
        self.array = array
4✔
171
        self.array_offset = offset
4✔
172
        assert offset % array.chunks[0] == 0
4✔
173
        self.name = name
4✔
174
        dims = list(array.shape)
4✔
175
        dims[0] = min(array.chunks[0], array.shape[0])
4✔
176
        self.buff = np.empty(dims, dtype=array.dtype)
4✔
177
        # Explicitly Fill with zeros here to make any out-of-memory errors happen
178
        # quickly.
179
        self.buff[:] = 0
4✔
180
        self.buffer_row = 0
4✔
181

182
    @property
4✔
183
    def variants_chunk_size(self):
4✔
184
        return self.buff.shape[0]
4✔
185

186
    def next_buffer_row(self):
4✔
187
        if self.buffer_row == self.variants_chunk_size:
4✔
188
            self.flush()
4✔
189
        row = self.buffer_row
4✔
190
        self.buffer_row += 1
4✔
191
        return row
4✔
192

193
    def flush(self):
4✔
194
        if self.buffer_row != 0:
4✔
195
            if len(self.array.chunks) <= 1:
4✔
196
                sync_flush_1d_array(
4✔
197
                    self.buff[: self.buffer_row], self.array, self.array_offset
198
                )
199
            else:
200
                sync_flush_2d_array(
4✔
201
                    self.buff[: self.buffer_row], self.array, self.array_offset
202
                )
203
            logger.debug(
4✔
204
                f"Flushed <{self.name} {self.array.shape} "
205
                f"{self.array.dtype}> "
206
                f"{self.array_offset}:{self.array_offset + self.buffer_row}"
207
                f"{self.buff.nbytes / 2**20: .2f}Mb"
208
            )
209
            # Note this is inaccurate for string data as we're just reporting the
210
            # size of the container. When we switch the numpy 2 StringDtype this
211
            # should improve and we can get more visibility on how memory
212
            # is being used.
213
            # https://github.com/sgkit-dev/bio2zarr/issues/30
214
            self.max_buff_size = max(self.max_buff_size, self.buff.nbytes)
4✔
215
            self.array_offset += self.variants_chunk_size
4✔
216
            self.buffer_row = 0
4✔
217

218

219
def sync_flush_1d_array(np_buffer, zarr_array, offset):
4✔
220
    zarr_array[offset : offset + np_buffer.shape[0]] = np_buffer
4✔
221
    update_progress(np_buffer.nbytes)
4✔
222

223

224
def sync_flush_2d_array(np_buffer, zarr_array, offset):
4✔
225
    # Write chunks in the second dimension 1-by-1 to make progress more
226
    # incremental, and to avoid large memcopies in the underlying
227
    # encoder implementations.
228
    s = slice(offset, offset + np_buffer.shape[0])
4✔
229
    samples_chunk_size = zarr_array.chunks[1]
4✔
230
    # TODO use zarr chunks here for simplicity
231
    zarr_array_width = zarr_array.shape[1]
4✔
232
    start = 0
4✔
233
    while start < zarr_array_width:
4✔
234
        stop = min(start + samples_chunk_size, zarr_array_width)
4✔
235
        chunk_buffer = np_buffer[:, start:stop]
4✔
236
        zarr_array[s, start:stop] = chunk_buffer
4✔
237
        update_progress(chunk_buffer.nbytes)
4✔
238
        start = stop
4✔
239

240

241
@dataclasses.dataclass
4✔
242
class ProgressConfig:
4✔
243
    total: int = 0
4✔
244
    units: str = ""
4✔
245
    title: str = ""
4✔
246
    show: bool = False
4✔
247
    poll_interval: float = 0.01
4✔
248

249

250
# NOTE: this approach means that we cannot have more than one
251
# progressable thing happening per source process. This is
252
# probably fine in practise, but there could be corner cases
253
# where it's not. Something to watch out for.
254
_progress_counter = None
4✔
255

256

257
def update_progress(inc):
4✔
258
    # If the _progress_counter has not been set we are working in a
259
    # synchronous non-progress tracking context
260
    if _progress_counter is not None:
4✔
261
        with _progress_counter.get_lock():
4✔
262
            _progress_counter.value += inc
4✔
263

264

265
def get_progress():
4✔
266
    with _progress_counter.get_lock():
4✔
267
        val = _progress_counter.value
4✔
268
    return val
4✔
269

270

271
def setup_progress_counter(counter):
4✔
272
    global _progress_counter
273
    _progress_counter = counter
4✔
274

275

276
class ParallelWorkManager(contextlib.AbstractContextManager):
4✔
277
    def __init__(self, worker_processes=1, progress_config=None):
4✔
278
        # Need to specify this explicitly to suppport Macs and
279
        # for future proofing.
280
        ctx = multiprocessing.get_context("spawn")
4✔
281
        global _progress_counter
282
        _progress_counter = ctx.Value("Q", 0)
4✔
283
        if worker_processes <= 0:
4✔
284
            # NOTE: this is only for testing and debugging, not for
285
            # production. See note on the SynchronousExecutor class.
286
            self.executor = SynchronousExecutor()
4✔
287
        else:
288
            self.executor = cf.ProcessPoolExecutor(
4✔
289
                max_workers=worker_processes,
290
                mp_context=ctx,
291
                initializer=setup_progress_counter,
292
                initargs=(_progress_counter,),
293
            )
294
        self.futures = set()
4✔
295

296
        if progress_config is None:
4✔
297
            progress_config = ProgressConfig()
4✔
298
        self.progress_config = progress_config
4✔
299
        self.progress_bar = tqdm.tqdm(
4✔
300
            total=progress_config.total,
301
            desc=f"{progress_config.title:>8}",
302
            unit_scale=True,
303
            unit=progress_config.units,
304
            smoothing=0.1,
305
            disable=not progress_config.show,
306
        )
307
        self.completed = False
4✔
308
        self.completed_lock = threading.Lock()
4✔
309
        self.progress_thread = threading.Thread(
4✔
310
            target=self._update_progress_worker,
311
            name="progress-update",
312
            daemon=True,  # Avoids deadlock on exit in awkward error conditions
313
        )
314
        self.progress_thread.start()
4✔
315

316
    def _update_progress(self):
4✔
317
        current = get_progress()
4✔
318
        inc = current - self.progress_bar.n
4✔
319
        self.progress_bar.update(inc)
4✔
320

321
    def _update_progress_worker(self):
4✔
322
        completed = False
4✔
323
        while not completed:
4✔
324
            self._update_progress()
4✔
325
            time.sleep(self.progress_config.poll_interval)
4✔
326
            with self.completed_lock:
4✔
327
                completed = self.completed
4✔
328
        logger.debug("Exit progress thread")
4✔
329

330
    def submit(self, *args, **kwargs):
4✔
331
        future = self.executor.submit(*args, **kwargs)
4✔
332
        self.futures.add(future)
4✔
333
        return future
4✔
334

335
    def results_as_completed(self):
4✔
336
        for future in cf.as_completed(self.futures):
4✔
337
            yield future.result()
4✔
338

339
    def __exit__(self, exc_type, exc_val, exc_tb):
4✔
340
        if exc_type is None:
4✔
341
            wait_on_futures(self.futures)
4✔
342
        else:
343
            cancel_futures(self.futures)
4✔
344
        # There's probably a much cleaner way of doing this with a Condition
345
        # or something, but this seems to work OK for now. This setup might
346
        # make small conversions a bit laggy as we wait on the sleep interval
347
        # though.
348
        with self.completed_lock:
4✔
349
            self.completed = True
4✔
350
        self.executor.shutdown(wait=False)
4✔
351
        # FIXME there's currently some thing weird happening at the end of
352
        # Encode 1D for 1kg-p3. The progress bar disappears, like we're
353
        # setting a total of zero or something.
354
        self.progress_thread.join()
4✔
355
        self._update_progress()
4✔
356
        self.progress_bar.close()
4✔
357
        return False
4✔
358

359

360
class JsonDataclass:
4✔
361
    def asdict(self):
4✔
362
        return dataclasses.asdict(self)
4✔
363

364
    def asjson(self):
4✔
365
        return json.dumps(self.asdict(), indent=4)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc