• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sgkit-dev / bio2zarr / 18940786519

30 Oct 2025 12:34PM UTC coverage: 98.361% (+0.07%) from 98.292%
18940786519

Pull #422

github

web-flow
Merge b33cd4646 into 3f955cef0
Pull Request #422: Update intro.md with VCF Zarr conversion info

2880 of 2928 relevant lines covered (98.36%)

3.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.12
/bio2zarr/core.py
1
import concurrent.futures as cf
4✔
2
import contextlib
4✔
3
import dataclasses
4✔
4
import functools
4✔
5
import importlib
4✔
6
import json
4✔
7
import logging
4✔
8
import math
4✔
9
import multiprocessing
4✔
10
import os
4✔
11
import os.path
4✔
12
import threading
4✔
13
import time
4✔
14

15
import humanfriendly
4✔
16
import numcodecs
4✔
17
import numpy as np
4✔
18
import tqdm
4✔
19
import zarr
4✔
20

21
logger = logging.getLogger(__name__)
4✔
22

23
numcodecs.blosc.use_threads = False
4✔
24

25

26
def requires_optional_dependency(module_name, extras_name):
4✔
27
    """Decorator to check for optional dependencies"""
28

29
    def decorator(func):
4✔
30
        @functools.wraps(func)
4✔
31
        def wrapper(*args, **kwargs):
4✔
32
            try:
4✔
33
                importlib.import_module(module_name)
4✔
34
            except ImportError:
4✔
35
                raise ImportError(
4✔
36
                    f"This process requires the optional {module_name} module. "
37
                    f"Install it with: pip install bio2zarr[{extras_name}]"
38
                ) from None
39
            return func(*args, **kwargs)
4✔
40

41
        return wrapper
4✔
42

43
    return decorator
4✔
44

45

46
def display_number(x):
4✔
47
    ret = "n/a"
4✔
48
    if math.isfinite(x):
4✔
49
        ret = f"{x: 0.2g}"
4✔
50
    return ret
4✔
51

52

53
def display_size(n):
4✔
54
    return humanfriendly.format_size(n, binary=True)
4✔
55

56

57
def parse_max_memory(max_memory):
4✔
58
    if max_memory is None:
4✔
59
        # Effectively unbounded
60
        return 2**63
4✔
61
    if isinstance(max_memory, str):
4✔
62
        max_memory = humanfriendly.parse_size(max_memory)
4✔
63
    logger.info(f"Set memory budget to {display_size(max_memory)}")
4✔
64
    return max_memory
4✔
65

66

67
def min_int_dtype(min_value, max_value):
4✔
68
    if min_value > max_value:
4✔
69
        raise ValueError("min_value must be <= max_value")
4✔
70
    for a_dtype in ["i1", "i2", "i4", "i8"]:
4✔
71
        info = np.iinfo(a_dtype)
4✔
72
        if info.min <= min_value and max_value <= info.max:
4✔
73
            return a_dtype
4✔
74
    raise OverflowError("Integer cannot be represented")
4✔
75

76

77
def chunk_aligned_slices(z, n, max_chunks=None):
4✔
78
    """
79
    Returns at n slices in the specified zarr array, aligned
80
    with its chunks
81
    """
82
    chunk_size = z.chunks[0]
4✔
83
    num_chunks = int(np.ceil(z.shape[0] / chunk_size))
4✔
84
    if max_chunks is not None:
4✔
85
        num_chunks = min(num_chunks, max_chunks)
4✔
86
    slices = []
4✔
87
    splits = np.array_split(np.arange(num_chunks), min(n, num_chunks))
4✔
88
    for split in splits:
4✔
89
        start = split[0] * chunk_size
4✔
90
        stop = (split[-1] + 1) * chunk_size
4✔
91
        stop = min(stop, z.shape[0])
4✔
92
        slices.append((start, stop))
4✔
93
    return slices
4✔
94

95

96
def first_dim_slice_iter(z, start, stop):
4✔
97
    """
98
    Efficiently iterate over the specified slice of the first dimension of the zarr
99
    array z.
100
    """
101
    chunk_size = z.chunks[0]
4✔
102
    first_chunk = start // chunk_size
4✔
103
    last_chunk = (stop // chunk_size) + (stop % chunk_size != 0)
4✔
104
    for chunk in range(first_chunk, last_chunk):
4✔
105
        Z = z.blocks[chunk]
4✔
106
        chunk_start = chunk * chunk_size
4✔
107
        chunk_stop = chunk_start + chunk_size
4✔
108
        slice_start = None
4✔
109
        if start > chunk_start:
4✔
110
            slice_start = start - chunk_start
4✔
111
        slice_stop = None
4✔
112
        if stop < chunk_stop:
4✔
113
            slice_stop = stop - chunk_start
4✔
114
        yield from Z[slice_start:slice_stop]
4✔
115

116

117
def du(path):
4✔
118
    """
119
    Return the total bytes stored at this path.
120
    """
121
    total = os.path.getsize(path)
4✔
122
    # pathlib walk method doesn't exist until 3.12 :(
123
    for root, dirs, files in os.walk(path):
4✔
124
        for lst in [dirs, files]:
4✔
125
            for name in lst:
4✔
126
                fullname = os.path.join(root, name)
4✔
127
                size = os.path.getsize(fullname)
4✔
128
                total += size
4✔
129
    logger.debug(f"du({path}) = {total}")
4✔
130
    return total
4✔
131

132

133
# We set the default number of worker processes to 0 because it avoids
134
# complexity in the call chain and makes things easier to debug by
135
# default. However, it does use the SynchronousExecutor here, which
136
# is technically not recommended by the Python docs.
137
DEFAULT_WORKER_PROCESSES = 0
4✔
138

139

140
class SynchronousExecutor(cf.Executor):
4✔
141
    # Since https://github.com/sgkit-dev/bio2zarr/issues/404 we
142
    # set worker_processses=0 as the default and use this
143
    # executor implementation. However, the docs are fairly explicit
144
    # about saying we shouldn't instantiate Future objects directly,
145
    # so we may need to revisit this is obscure problems start to
146
    # arise.
147
    def submit(self, fn, /, *args, **kwargs):
4✔
148
        future = cf.Future()
4✔
149
        future.set_result(fn(*args, **kwargs))
4✔
150
        return future
4✔
151

152

153
def wait_on_futures(futures):
4✔
154
    for future in cf.as_completed(futures):
4✔
155
        exception = future.exception()
4✔
156
        if exception is not None:
4✔
157
            cancel_futures(futures)
4✔
158
            if isinstance(exception, cf.process.BrokenProcessPool):
4✔
159
                raise RuntimeError(
×
160
                    "Worker process died: you may have run out of memory"
161
                ) from exception
162
            else:
163
                raise exception
4✔
164

165

166
def cancel_futures(futures):
4✔
167
    for future in futures:
4✔
168
        future.cancel()
4✔
169

170

171
@dataclasses.dataclass
4✔
172
class BufferedArray:
4✔
173
    array: zarr.Array
4✔
174
    array_offset: int
4✔
175
    name: str
4✔
176
    buff: np.ndarray
4✔
177
    buffer_row: int
4✔
178
    max_buff_size: int = 0
4✔
179

180
    def __init__(self, array, offset, name="Unknown"):
4✔
181
        self.array = array
4✔
182
        self.array_offset = offset
4✔
183
        assert offset % array.chunks[0] == 0
4✔
184
        self.name = name
4✔
185
        dims = list(array.shape)
4✔
186
        dims[0] = min(array.chunks[0], array.shape[0])
4✔
187
        self.buff = np.empty(dims, dtype=array.dtype)
4✔
188
        # Explicitly Fill with zeros here to make any out-of-memory errors happen
189
        # quickly.
190
        self.buff[:] = 0
4✔
191
        self.buffer_row = 0
4✔
192

193
    @property
4✔
194
    def variants_chunk_size(self):
4✔
195
        return self.buff.shape[0]
4✔
196

197
    def next_buffer_row(self):
4✔
198
        if self.buffer_row == self.variants_chunk_size:
4✔
199
            self.flush()
4✔
200
        row = self.buffer_row
4✔
201
        self.buffer_row += 1
4✔
202
        return row
4✔
203

204
    def flush(self):
4✔
205
        if self.buffer_row != 0:
4✔
206
            if len(self.array.chunks) <= 1:
4✔
207
                sync_flush_1d_array(
4✔
208
                    self.buff[: self.buffer_row], self.array, self.array_offset
209
                )
210
            else:
211
                sync_flush_2d_array(
4✔
212
                    self.buff[: self.buffer_row], self.array, self.array_offset
213
                )
214
            logger.debug(
4✔
215
                f"Flushed <{self.name} {self.array.shape} "
216
                f"{self.array.dtype}> "
217
                f"{self.array_offset}:{self.array_offset + self.buffer_row}"
218
                f"{self.buff.nbytes / 2**20: .2f}Mb"
219
            )
220
            # Note this is inaccurate for string data as we're just reporting the
221
            # size of the container. When we switch the numpy 2 StringDtype this
222
            # should improve and we can get more visibility on how memory
223
            # is being used.
224
            # https://github.com/sgkit-dev/bio2zarr/issues/30
225
            self.max_buff_size = max(self.max_buff_size, self.buff.nbytes)
4✔
226
            self.array_offset += self.variants_chunk_size
4✔
227
            self.buffer_row = 0
4✔
228

229

230
def sync_flush_1d_array(np_buffer, zarr_array, offset):
4✔
231
    zarr_array[offset : offset + np_buffer.shape[0]] = np_buffer
4✔
232
    update_progress(np_buffer.nbytes)
4✔
233

234

235
def sync_flush_2d_array(np_buffer, zarr_array, offset):
4✔
236
    # Write chunks in the second dimension 1-by-1 to make progress more
237
    # incremental, and to avoid large memcopies in the underlying
238
    # encoder implementations.
239
    s = slice(offset, offset + np_buffer.shape[0])
4✔
240
    samples_chunk_size = zarr_array.chunks[1]
4✔
241
    # TODO use zarr chunks here for simplicity
242
    zarr_array_width = zarr_array.shape[1]
4✔
243
    start = 0
4✔
244
    while start < zarr_array_width:
4✔
245
        stop = min(start + samples_chunk_size, zarr_array_width)
4✔
246
        chunk_buffer = np_buffer[:, start:stop]
4✔
247
        zarr_array[s, start:stop] = chunk_buffer
4✔
248
        update_progress(chunk_buffer.nbytes)
4✔
249
        start = stop
4✔
250

251

252
@dataclasses.dataclass
4✔
253
class ProgressConfig:
4✔
254
    total: int = 0
4✔
255
    units: str = ""
4✔
256
    title: str = ""
4✔
257
    show: bool = False
4✔
258
    poll_interval: float = 0.01
4✔
259

260

261
# NOTE: this approach means that we cannot have more than one
262
# progressable thing happening per source process. This is
263
# probably fine in practise, but there could be corner cases
264
# where it's not. Something to watch out for.
265
_progress_counter = None
4✔
266

267

268
def update_progress(inc):
4✔
269
    # If the _progress_counter has not been set we are working in a
270
    # synchronous non-progress tracking context
271
    if _progress_counter is not None:
4✔
272
        with _progress_counter.get_lock():
4✔
273
            _progress_counter.value += inc
4✔
274

275

276
def get_progress():
4✔
277
    with _progress_counter.get_lock():
4✔
278
        val = _progress_counter.value
4✔
279
    return val
4✔
280

281

282
def setup_progress_counter(counter):
4✔
283
    global _progress_counter
284
    _progress_counter = counter
×
285

286

287
class ParallelWorkManager(contextlib.AbstractContextManager):
4✔
288
    def __init__(self, worker_processes=1, progress_config=None):
4✔
289
        # Need to specify this explicitly to suppport Macs and
290
        # for future proofing.
291
        ctx = multiprocessing.get_context("spawn")
4✔
292
        global _progress_counter
293
        _progress_counter = ctx.Value("Q", 0)
4✔
294
        if worker_processes <= 0:
4✔
295
            # NOTE: this is only for testing and debugging, not for
296
            # production. See note on the SynchronousExecutor class.
297
            self.executor = SynchronousExecutor()
4✔
298
        else:
299
            self.executor = cf.ProcessPoolExecutor(
4✔
300
                max_workers=worker_processes,
301
                mp_context=ctx,
302
                initializer=setup_progress_counter,
303
                initargs=(_progress_counter,),
304
            )
305
        self.futures = set()
4✔
306

307
        if progress_config is None:
4✔
308
            progress_config = ProgressConfig()
4✔
309
        self.progress_config = progress_config
4✔
310
        self.progress_bar = tqdm.tqdm(
4✔
311
            total=progress_config.total,
312
            desc=f"{progress_config.title:>8}",
313
            unit_scale=True,
314
            unit=progress_config.units,
315
            smoothing=0.1,
316
            disable=not progress_config.show,
317
        )
318
        self.completed = False
4✔
319
        self.completed_lock = threading.Lock()
4✔
320
        self.progress_thread = threading.Thread(
4✔
321
            target=self._update_progress_worker,
322
            name="progress-update",
323
            daemon=True,  # Avoids deadlock on exit in awkward error conditions
324
        )
325
        self.progress_thread.start()
4✔
326

327
    def _update_progress(self):
4✔
328
        current = get_progress()
4✔
329
        inc = current - self.progress_bar.n
4✔
330
        self.progress_bar.update(inc)
4✔
331

332
    def _update_progress_worker(self):
4✔
333
        completed = False
4✔
334
        while not completed:
4✔
335
            self._update_progress()
4✔
336
            time.sleep(self.progress_config.poll_interval)
4✔
337
            with self.completed_lock:
4✔
338
                completed = self.completed
4✔
339
        logger.debug("Exit progress thread")
4✔
340

341
    def submit(self, *args, **kwargs):
4✔
342
        future = self.executor.submit(*args, **kwargs)
4✔
343
        self.futures.add(future)
4✔
344
        return future
4✔
345

346
    def results_as_completed(self):
4✔
347
        for future in cf.as_completed(self.futures):
4✔
348
            yield future.result()
4✔
349

350
    def __exit__(self, exc_type, exc_val, exc_tb):
4✔
351
        if exc_type is None:
4✔
352
            wait_on_futures(self.futures)
4✔
353
        else:
354
            cancel_futures(self.futures)
4✔
355
        # There's probably a much cleaner way of doing this with a Condition
356
        # or something, but this seems to work OK for now. This setup might
357
        # make small conversions a bit laggy as we wait on the sleep interval
358
        # though.
359
        with self.completed_lock:
4✔
360
            self.completed = True
4✔
361
        self.executor.shutdown(wait=False)
4✔
362
        # FIXME there's currently some thing weird happening at the end of
363
        # Encode 1D for 1kg-p3. The progress bar disappears, like we're
364
        # setting a total of zero or something.
365
        self.progress_thread.join()
4✔
366
        self._update_progress()
4✔
367
        self.progress_bar.close()
4✔
368
        return False
4✔
369

370

371
class JsonDataclass:
4✔
372
    def asdict(self):
4✔
373
        return dataclasses.asdict(self)
4✔
374

375
    def asjson(self):
4✔
376
        return json.dumps(self.asdict(), indent=4)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc