• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sgkit-dev / bio2zarr / 12312346477

13 Dec 2024 08:41AM UTC coverage: 98.25% (-0.7%) from 98.91%
12312346477

Pull #281

github

web-flow
Merge a398a9196 into 883a37e81
Pull Request #281: Draft bed2zarr code

138 of 154 new or added lines in 3 files covered. (89.61%)

13 existing lines in 4 files now uncovered.

2583 of 2629 relevant lines covered (98.25%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.4
/bio2zarr/vcf2zarr/vcz.py
1
import contextlib
1✔
2
import dataclasses
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import os.path
1✔
7
import pathlib
1✔
8
import shutil
1✔
9
import tempfile
1✔
10

11
import humanfriendly
1✔
12
import numcodecs
1✔
13
import numpy as np
1✔
14
import zarr
1✔
15

16
from bio2zarr.zarr_utils import ZARR_FORMAT_KWARGS
1✔
17

18
from .. import constants, core, provenance
1✔
19
from . import icf
1✔
20

21
logger = logging.getLogger(__name__)
1✔
22

23

24
def inspect(path):
1✔
25
    path = pathlib.Path(path)
1✔
26
    # TODO add support for the Zarr format also
27
    if (path / "metadata.json").exists():
1✔
28
        obj = icf.IntermediateColumnarFormat(path)
1✔
29
    elif (path / ".zmetadata").exists():
1✔
30
        obj = VcfZarr(path)
1✔
31
    else:
UNCOV
32
        raise ValueError("Format not recognised")  # NEEDS TEST
×
33
    return obj.summary_table()
1✔
34

35

36
DEFAULT_ZARR_COMPRESSOR = numcodecs.Blosc(cname="zstd", clevel=7)
1✔
37

38
_fixed_field_descriptions = {
1✔
39
    "variant_contig": "An identifier from the reference genome or an angle-bracketed ID"
40
    " string pointing to a contig in the assembly file",
41
    "variant_position": "The reference position",
42
    "variant_length": "The length of the variant measured in bases",
43
    "variant_id": "List of unique identifiers where applicable",
44
    "variant_allele": "List of the reference and alternate alleles",
45
    "variant_quality": "Phred-scaled quality score",
46
    "variant_filter": "Filter status of the variant",
47
}
48

49

50
@dataclasses.dataclass
1✔
51
class ZarrArraySpec:
1✔
52
    name: str
1✔
53
    dtype: str
1✔
54
    shape: tuple
1✔
55
    chunks: tuple
1✔
56
    dimensions: tuple
1✔
57
    description: str
1✔
58
    vcf_field: str
1✔
59
    compressor: dict
1✔
60
    filters: list
1✔
61

62
    def __post_init__(self):
1✔
63
        if self.name in _fixed_field_descriptions:
1✔
64
            self.description = self.description or _fixed_field_descriptions[self.name]
1✔
65

66
        # Ensure these are tuples for ease of comparison and consistency
67
        self.shape = tuple(self.shape)
1✔
68
        self.chunks = tuple(self.chunks)
1✔
69
        self.dimensions = tuple(self.dimensions)
1✔
70
        self.filters = tuple(self.filters)
1✔
71

72
    @staticmethod
1✔
73
    def new(**kwargs):
1✔
74
        spec = ZarrArraySpec(
1✔
75
            **kwargs, compressor=DEFAULT_ZARR_COMPRESSOR.get_config(), filters=[]
76
        )
77
        spec._choose_compressor_settings()
1✔
78
        return spec
1✔
79

80
    @staticmethod
1✔
81
    def from_field(
1✔
82
        vcf_field,
83
        *,
84
        num_variants,
85
        num_samples,
86
        variants_chunk_size,
87
        samples_chunk_size,
88
        array_name=None,
89
    ):
90
        shape = [num_variants]
1✔
91
        prefix = "variant_"
1✔
92
        dimensions = ["variants"]
1✔
93
        chunks = [variants_chunk_size]
1✔
94
        if vcf_field.category == "FORMAT":
1✔
95
            prefix = "call_"
1✔
96
            shape.append(num_samples)
1✔
97
            chunks.append(samples_chunk_size)
1✔
98
            dimensions.append("samples")
1✔
99
        if array_name is None:
1✔
100
            array_name = prefix + vcf_field.name
1✔
101
        # TODO make an option to add in the empty extra dimension
102
        if vcf_field.summary.max_number > 1 or vcf_field.full_name == "FORMAT/LAA":
1✔
103
            shape.append(vcf_field.summary.max_number)
1✔
104
            chunks.append(vcf_field.summary.max_number)
1✔
105
            # TODO we should really be checking this to see if the named dimensions
106
            # are actually correct.
107
            if vcf_field.vcf_number == "R":
1✔
108
                dimensions.append("alleles")
1✔
109
            elif vcf_field.vcf_number == "A":
1✔
110
                dimensions.append("alt_alleles")
1✔
111
            elif vcf_field.vcf_number == "G":
1✔
112
                dimensions.append("genotypes")
1✔
113
            else:
114
                dimensions.append(f"{vcf_field.category}_{vcf_field.name}_dim")
1✔
115
        return ZarrArraySpec.new(
1✔
116
            vcf_field=vcf_field.full_name,
117
            name=array_name,
118
            dtype=vcf_field.smallest_dtype(),
119
            shape=shape,
120
            chunks=chunks,
121
            dimensions=dimensions,
122
            description=vcf_field.description,
123
        )
124

125
    def _choose_compressor_settings(self):
1✔
126
        """
127
        Choose compressor and filter settings based on the size and
128
        type of the array, plus some hueristics from observed properties
129
        of VCFs.
130

131
        See https://github.com/pystatgen/bio2zarr/discussions/74
132
        """
133
        # Default is to not shuffle, because autoshuffle isn't recognised
134
        # by many Zarr implementations, and shuffling can lead to worse
135
        # performance in some cases anyway. Turning on shuffle should be a
136
        # deliberate choice.
137
        shuffle = numcodecs.Blosc.NOSHUFFLE
1✔
138
        if self.name == "call_genotype" and self.dtype == "i1":
1✔
139
            # call_genotype gets BITSHUFFLE by default as it gets
140
            # significantly better compression (at a cost of slower
141
            # decoding)
142
            shuffle = numcodecs.Blosc.BITSHUFFLE
1✔
143
        elif self.dtype == "bool":
1✔
144
            shuffle = numcodecs.Blosc.BITSHUFFLE
1✔
145

146
        self.compressor["shuffle"] = shuffle
1✔
147

148
    @property
1✔
149
    def chunk_nbytes(self):
1✔
150
        """
151
        Returns the nbytes for a single chunk in this array.
152
        """
153
        items = 1
1✔
154
        dim = 0
1✔
155
        for chunk_size in self.chunks:
1✔
156
            size = min(chunk_size, self.shape[dim])
1✔
157
            items *= size
1✔
158
            dim += 1
1✔
159
        # Include sizes for extra dimensions.
160
        for size in self.shape[dim:]:
1✔
UNCOV
161
            items *= size
×
162
        dt = np.dtype(self.dtype)
1✔
163
        return items * dt.itemsize
1✔
164

165
    @property
1✔
166
    def variant_chunk_nbytes(self):
1✔
167
        """
168
        Returns the nbytes for a single variant chunk of this array.
169
        """
170
        chunk_items = self.chunks[0]
1✔
171
        for size in self.shape[1:]:
1✔
172
            chunk_items *= size
1✔
173
        dt = np.dtype(self.dtype)
1✔
174
        if dt.kind == "O" and "samples" in self.dimensions:
1✔
175
            logger.warning(
1✔
176
                f"Field {self.name} is a string; max memory usage may "
177
                "be a significant underestimate"
178
            )
179
        return chunk_items * dt.itemsize
1✔
180

181

182
ZARR_SCHEMA_FORMAT_VERSION = "0.4"
1✔
183

184

185
@dataclasses.dataclass
1✔
186
class VcfZarrSchema(core.JsonDataclass):
1✔
187
    format_version: str
1✔
188
    samples_chunk_size: int
1✔
189
    variants_chunk_size: int
1✔
190
    samples: list
1✔
191
    contigs: list
1✔
192
    filters: list
1✔
193
    fields: list
1✔
194

195
    def validate(self):
1✔
196
        """
197
        Checks that the schema is well-formed and within required limits.
198
        """
199
        for field in self.fields:
1✔
200
            # This is the Blosc max buffer size
201
            if field.chunk_nbytes > 2147483647:
1✔
202
                # TODO add some links to documentation here advising how to
203
                # deal with PL values.
204
                raise ValueError(
1✔
205
                    f"Field {field.name} chunks are too large "
206
                    f"({field.chunk_nbytes} > 2**31 - 1 bytes). "
207
                    "Either generate a schema and drop this field (if you don't "
208
                    "need it) or reduce the variant or sample chunk sizes."
209
                )
210
            # TODO other checks? There must be lots of ways people could mess
211
            # up the schema leading to cryptic errors.
212

213
    def field_map(self):
1✔
214
        return {field.name: field for field in self.fields}
1✔
215

216
    @staticmethod
1✔
217
    def fromdict(d):
1✔
218
        if d["format_version"] != ZARR_SCHEMA_FORMAT_VERSION:
1✔
219
            raise ValueError(
1✔
220
                "Zarr schema format version mismatch: "
221
                f"{d['format_version']} != {ZARR_SCHEMA_FORMAT_VERSION}"
222
            )
223
        ret = VcfZarrSchema(**d)
1✔
224
        ret.samples = [icf.Sample(**sd) for sd in d["samples"]]
1✔
225
        ret.contigs = [icf.Contig(**sd) for sd in d["contigs"]]
1✔
226
        ret.filters = [icf.Filter(**sd) for sd in d["filters"]]
1✔
227
        ret.fields = [ZarrArraySpec(**sd) for sd in d["fields"]]
1✔
228
        return ret
1✔
229

230
    @staticmethod
1✔
231
    def fromjson(s):
1✔
232
        return VcfZarrSchema.fromdict(json.loads(s))
1✔
233

234
    @staticmethod
1✔
235
    def generate(icf, variants_chunk_size=None, samples_chunk_size=None):
1✔
236
        m = icf.num_records
1✔
237
        n = icf.num_samples
1✔
238
        # FIXME
239
        if samples_chunk_size is None:
1✔
240
            samples_chunk_size = 1000
1✔
241
        if variants_chunk_size is None:
1✔
242
            variants_chunk_size = 10_000
1✔
243
        logger.info(
1✔
244
            f"Generating schema with chunks={variants_chunk_size, samples_chunk_size}"
245
        )
246

247
        def spec_from_field(field, array_name=None):
1✔
248
            return ZarrArraySpec.from_field(
1✔
249
                field,
250
                num_samples=n,
251
                num_variants=m,
252
                samples_chunk_size=samples_chunk_size,
253
                variants_chunk_size=variants_chunk_size,
254
                array_name=array_name,
255
            )
256

257
        def fixed_field_spec(
1✔
258
            name,
259
            dtype,
260
            vcf_field=None,
261
            shape=(m,),
262
            dimensions=("variants",),
263
            chunks=None,
264
        ):
265
            return ZarrArraySpec.new(
1✔
266
                vcf_field=vcf_field,
267
                name=name,
268
                dtype=dtype,
269
                shape=shape,
270
                description="",
271
                dimensions=dimensions,
272
                chunks=chunks or [variants_chunk_size],
273
            )
274

275
        alt_field = icf.fields["ALT"]
1✔
276
        max_alleles = alt_field.vcf_field.summary.max_number + 1
1✔
277

278
        array_specs = [
1✔
279
            fixed_field_spec(
280
                name="variant_contig",
281
                dtype=core.min_int_dtype(0, icf.metadata.num_contigs),
282
            ),
283
            fixed_field_spec(
284
                name="variant_filter",
285
                dtype="bool",
286
                shape=(m, icf.metadata.num_filters),
287
                dimensions=["variants", "filters"],
288
                chunks=(variants_chunk_size, icf.metadata.num_filters),
289
            ),
290
            fixed_field_spec(
291
                name="variant_allele",
292
                dtype="O",
293
                shape=(m, max_alleles),
294
                dimensions=["variants", "alleles"],
295
                chunks=(variants_chunk_size, max_alleles),
296
            ),
297
            fixed_field_spec(
298
                name="variant_id",
299
                dtype="O",
300
            ),
301
            fixed_field_spec(
302
                name="variant_id_mask",
303
                dtype="bool",
304
            ),
305
        ]
306
        name_map = {field.full_name: field for field in icf.metadata.fields}
1✔
307

308
        # Only three of the fixed fields have a direct one-to-one mapping.
309
        array_specs.extend(
1✔
310
            [
311
                spec_from_field(name_map["QUAL"], array_name="variant_quality"),
312
                spec_from_field(name_map["POS"], array_name="variant_position"),
313
                spec_from_field(name_map["rlen"], array_name="variant_length"),
314
            ]
315
        )
316
        array_specs.extend(
1✔
317
            [spec_from_field(field) for field in icf.metadata.info_fields]
318
        )
319

320
        gt_field = None
1✔
321
        for field in icf.metadata.format_fields:
1✔
322
            if field.name == "GT":
1✔
323
                gt_field = field
1✔
324
                continue
1✔
325
            array_specs.append(spec_from_field(field))
1✔
326

327
        if gt_field is not None:
1✔
328
            ploidy = max(gt_field.summary.max_number - 1, 1)
1✔
329
            shape = [m, n]
1✔
330
            chunks = [variants_chunk_size, samples_chunk_size]
1✔
331
            dimensions = ["variants", "samples"]
1✔
332
            array_specs.append(
1✔
333
                ZarrArraySpec.new(
334
                    vcf_field=None,
335
                    name="call_genotype_phased",
336
                    dtype="bool",
337
                    shape=list(shape),
338
                    chunks=list(chunks),
339
                    dimensions=list(dimensions),
340
                    description="",
341
                )
342
            )
343
            shape += [ploidy]
1✔
344
            chunks += [ploidy]
1✔
345
            dimensions += ["ploidy"]
1✔
346
            array_specs.append(
1✔
347
                ZarrArraySpec.new(
348
                    vcf_field=None,
349
                    name="call_genotype",
350
                    dtype=gt_field.smallest_dtype(),
351
                    shape=list(shape),
352
                    chunks=list(chunks),
353
                    dimensions=list(dimensions),
354
                    description="",
355
                )
356
            )
357
            array_specs.append(
1✔
358
                ZarrArraySpec.new(
359
                    vcf_field=None,
360
                    name="call_genotype_mask",
361
                    dtype="bool",
362
                    shape=list(shape),
363
                    chunks=list(chunks),
364
                    dimensions=list(dimensions),
365
                    description="",
366
                )
367
            )
368

369
        return VcfZarrSchema(
1✔
370
            format_version=ZARR_SCHEMA_FORMAT_VERSION,
371
            samples_chunk_size=samples_chunk_size,
372
            variants_chunk_size=variants_chunk_size,
373
            fields=array_specs,
374
            samples=icf.metadata.samples,
375
            contigs=icf.metadata.contigs,
376
            filters=icf.metadata.filters,
377
        )
378

379

380
class VcfZarr:
1✔
381
    def __init__(self, path):
1✔
382
        if not (path / ".zmetadata").exists():
1✔
UNCOV
383
            raise ValueError("Not in VcfZarr format")  # NEEDS TEST
×
384
        self.path = path
1✔
385
        self.root = zarr.open(path, mode="r")
1✔
386

387
    def summary_table(self):
1✔
388
        data = []
1✔
389
        arrays = [(core.du(self.path / a.basename), a) for _, a in self.root.arrays()]
1✔
390
        arrays.sort(key=lambda x: x[0])
1✔
391
        for stored, array in reversed(arrays):
1✔
392
            d = {
1✔
393
                "name": array.name,
394
                "dtype": str(array.dtype),
395
                "stored": core.display_size(stored),
396
                "size": core.display_size(array.nbytes),
397
                "ratio": core.display_number(array.nbytes / stored),
398
                "nchunks": str(array.nchunks),
399
                "chunk_size": core.display_size(array.nbytes / array.nchunks),
400
                "avg_chunk_stored": core.display_size(int(stored / array.nchunks)),
401
                "shape": str(array.shape),
402
                "chunk_shape": str(array.chunks),
403
                "compressor": str(array.compressor),
404
                "filters": str(array.filters),
405
            }
406
            data.append(d)
1✔
407
        return data
1✔
408

409

410
def parse_max_memory(max_memory):
1✔
411
    if max_memory is None:
1✔
412
        # Effectively unbounded
413
        return 2**63
1✔
414
    if isinstance(max_memory, str):
1✔
415
        max_memory = humanfriendly.parse_size(max_memory)
1✔
416
    logger.info(f"Set memory budget to {core.display_size(max_memory)}")
1✔
417
    return max_memory
1✔
418

419

420
@dataclasses.dataclass
1✔
421
class VcfZarrPartition:
1✔
422
    start: int
1✔
423
    stop: int
1✔
424

425
    @staticmethod
1✔
426
    def generate_partitions(num_records, chunk_size, num_partitions, max_chunks=None):
1✔
427
        num_chunks = int(np.ceil(num_records / chunk_size))
1✔
428
        if max_chunks is not None:
1✔
429
            num_chunks = min(num_chunks, max_chunks)
1✔
430
        partitions = []
1✔
431
        splits = np.array_split(np.arange(num_chunks), min(num_partitions, num_chunks))
1✔
432
        for chunk_slice in splits:
1✔
433
            start_chunk = int(chunk_slice[0])
1✔
434
            stop_chunk = int(chunk_slice[-1]) + 1
1✔
435
            start_index = start_chunk * chunk_size
1✔
436
            stop_index = min(stop_chunk * chunk_size, num_records)
1✔
437
            partitions.append(VcfZarrPartition(start_index, stop_index))
1✔
438
        return partitions
1✔
439

440

441
VZW_METADATA_FORMAT_VERSION = "0.1"
1✔
442

443

444
@dataclasses.dataclass
1✔
445
class VcfZarrWriterMetadata(core.JsonDataclass):
1✔
446
    format_version: str
1✔
447
    icf_path: str
1✔
448
    schema: VcfZarrSchema
1✔
449
    dimension_separator: str
1✔
450
    partitions: list
1✔
451
    provenance: dict
1✔
452

453
    @staticmethod
1✔
454
    def fromdict(d):
1✔
455
        if d["format_version"] != VZW_METADATA_FORMAT_VERSION:
1✔
456
            raise ValueError(
1✔
457
                "VcfZarrWriter format version mismatch: "
458
                f"{d['format_version']} != {VZW_METADATA_FORMAT_VERSION}"
459
            )
460
        ret = VcfZarrWriterMetadata(**d)
1✔
461
        ret.schema = VcfZarrSchema.fromdict(ret.schema)
1✔
462
        ret.partitions = [VcfZarrPartition(**p) for p in ret.partitions]
1✔
463
        return ret
1✔
464

465

466
@dataclasses.dataclass
1✔
467
class VcfZarrWriteSummary(core.JsonDataclass):
1✔
468
    num_partitions: int
1✔
469
    num_samples: int
1✔
470
    num_variants: int
1✔
471
    num_chunks: int
1✔
472
    max_encoding_memory: str
1✔
473

474

475
class VcfZarrWriter:
1✔
476
    def __init__(self, path):
1✔
477
        self.path = pathlib.Path(path)
1✔
478
        self.wip_path = self.path / "wip"
1✔
479
        self.arrays_path = self.wip_path / "arrays"
1✔
480
        self.partitions_path = self.wip_path / "partitions"
1✔
481
        self.metadata = None
1✔
482
        self.icf = None
1✔
483

484
    @property
1✔
485
    def schema(self):
1✔
486
        return self.metadata.schema
1✔
487

488
    @property
1✔
489
    def num_partitions(self):
1✔
490
        return len(self.metadata.partitions)
1✔
491

492
    def has_genotypes(self):
1✔
493
        for field in self.schema.fields:
1✔
494
            if field.name == "call_genotype":
1✔
495
                return True
1✔
496
        return False
1✔
497

498
    #######################
499
    # init
500
    #######################
501

502
    def init(
1✔
503
        self,
504
        icf,
505
        *,
506
        target_num_partitions,
507
        schema,
508
        dimension_separator=None,
509
        max_variant_chunks=None,
510
    ):
511
        self.icf = icf
1✔
512
        if self.path.exists():
1✔
UNCOV
513
            raise ValueError("Zarr path already exists")  # NEEDS TEST
×
514
        schema.validate()
1✔
515
        partitions = VcfZarrPartition.generate_partitions(
1✔
516
            self.icf.num_records,
517
            schema.variants_chunk_size,
518
            target_num_partitions,
519
            max_chunks=max_variant_chunks,
520
        )
521
        # Default to using nested directories following the Zarr v3 default.
522
        # This seems to require version 2.17+ to work properly
523
        dimension_separator = (
1✔
524
            "/" if dimension_separator is None else dimension_separator
525
        )
526
        self.metadata = VcfZarrWriterMetadata(
1✔
527
            format_version=VZW_METADATA_FORMAT_VERSION,
528
            icf_path=str(self.icf.path),
529
            schema=schema,
530
            dimension_separator=dimension_separator,
531
            partitions=partitions,
532
            # Bare minimum here for provenance - see comments above
533
            provenance={"source": f"bio2zarr-{provenance.__version__}"},
534
        )
535

536
        self.path.mkdir()
1✔
537
        root = zarr.open(store=self.path, mode="a", **ZARR_FORMAT_KWARGS)
1✔
538
        root.attrs.update(
1✔
539
            {
540
                "vcf_zarr_version": "0.2",
541
                "vcf_header": self.icf.vcf_header,
542
                "source": f"bio2zarr-{provenance.__version__}",
543
            }
544
        )
545
        # Doing this syncronously - this is fine surely
546
        self.encode_samples(root)
1✔
547
        self.encode_filter_id(root)
1✔
548
        self.encode_contig_id(root)
1✔
549

550
        self.wip_path.mkdir()
1✔
551
        self.arrays_path.mkdir()
1✔
552
        self.partitions_path.mkdir()
1✔
553
        root = zarr.open(store=self.arrays_path, mode="a", **ZARR_FORMAT_KWARGS)
1✔
554

555
        total_chunks = 0
1✔
556
        for field in self.schema.fields:
1✔
557
            a = self.init_array(root, field, partitions[-1].stop)
1✔
558
            total_chunks += a.nchunks
1✔
559

560
        logger.info("Writing WIP metadata")
1✔
561
        with open(self.wip_path / "metadata.json", "w") as f:
1✔
562
            json.dump(self.metadata.asdict(), f, indent=4)
1✔
563

564
        return VcfZarrWriteSummary(
1✔
565
            num_variants=self.icf.num_records,
566
            num_samples=self.icf.num_samples,
567
            num_partitions=self.num_partitions,
568
            num_chunks=total_chunks,
569
            max_encoding_memory=core.display_size(self.get_max_encoding_memory()),
570
        )
571

572
    def encode_samples(self, root):
1✔
573
        if self.schema.samples != self.icf.metadata.samples:
1✔
574
            raise ValueError("Subsetting or reordering samples not supported currently")
1✔
575
        array = root.array(
1✔
576
            "sample_id",
577
            data=[sample.id for sample in self.schema.samples],
578
            shape=len(self.schema.samples),
579
            dtype="str",
580
            compressor=DEFAULT_ZARR_COMPRESSOR,
581
            chunks=(self.schema.samples_chunk_size,),
582
        )
583
        array.attrs["_ARRAY_DIMENSIONS"] = ["samples"]
1✔
584
        logger.debug("Samples done")
1✔
585

586
    def encode_contig_id(self, root):
1✔
587
        array = root.array(
1✔
588
            "contig_id",
589
            data=[contig.id for contig in self.schema.contigs],
590
            shape=len(self.schema.contigs),
591
            dtype="str",
592
            compressor=DEFAULT_ZARR_COMPRESSOR,
593
        )
594
        array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]
1✔
595
        if all(contig.length is not None for contig in self.schema.contigs):
1✔
596
            array = root.array(
1✔
597
                "contig_length",
598
                data=[contig.length for contig in self.schema.contigs],
599
                shape=len(self.schema.contigs),
600
                dtype=np.int64,
601
                compressor=DEFAULT_ZARR_COMPRESSOR,
602
            )
603
            array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]
1✔
604

605
    def encode_filter_id(self, root):
1✔
606
        # TODO need a way to store description also
607
        # https://github.com/sgkit-dev/vcf-zarr-spec/issues/19
608
        array = root.array(
1✔
609
            "filter_id",
610
            data=[filt.id for filt in self.schema.filters],
611
            shape=len(self.schema.filters),
612
            dtype="str",
613
            compressor=DEFAULT_ZARR_COMPRESSOR,
614
        )
615
        array.attrs["_ARRAY_DIMENSIONS"] = ["filters"]
1✔
616

617
    def init_array(self, root, array_spec, variants_dim_size):
1✔
618
        object_codec = None
1✔
619
        if array_spec.dtype == "O":
1✔
620
            object_codec = numcodecs.VLenUTF8()
1✔
621
        shape = list(array_spec.shape)
1✔
622
        # Truncate the variants dimension is max_variant_chunks was specified
623
        shape[0] = variants_dim_size
1✔
624
        a = root.empty(
1✔
625
            name=array_spec.name,
626
            shape=shape,
627
            chunks=array_spec.chunks,
628
            dtype=array_spec.dtype,
629
            compressor=numcodecs.get_codec(array_spec.compressor),
630
            filters=[numcodecs.get_codec(filt) for filt in array_spec.filters],
631
            object_codec=object_codec,
632
            dimension_separator=self.metadata.dimension_separator,
633
            **ZARR_FORMAT_KWARGS,
634
        )
635
        a.attrs.update(
1✔
636
            {
637
                "description": array_spec.description,
638
                # Dimension names are part of the spec in Zarr v3
639
                "_ARRAY_DIMENSIONS": array_spec.dimensions,
640
            }
641
        )
642
        logger.debug(f"Initialised {a}")
1✔
643
        return a
1✔
644

645
    #######################
646
    # encode_partition
647
    #######################
648

649
    def load_metadata(self):
1✔
650
        if self.metadata is None:
1✔
651
            with open(self.wip_path / "metadata.json") as f:
1✔
652
                self.metadata = VcfZarrWriterMetadata.fromdict(json.load(f))
1✔
653
            self.icf = icf.IntermediateColumnarFormat(self.metadata.icf_path)
1✔
654

655
    def partition_path(self, partition_index):
1✔
656
        return self.partitions_path / f"p{partition_index}"
1✔
657

658
    def wip_partition_path(self, partition_index):
1✔
659
        return self.partitions_path / f"wip_p{partition_index}"
1✔
660

661
    def wip_partition_array_path(self, partition_index, name):
1✔
662
        return self.wip_partition_path(partition_index) / name
1✔
663

664
    def partition_array_path(self, partition_index, name):
1✔
665
        return self.partition_path(partition_index) / name
1✔
666

667
    def encode_partition(self, partition_index):
1✔
668
        self.load_metadata()
1✔
669
        if partition_index < 0 or partition_index >= self.num_partitions:
1✔
670
            raise ValueError("Partition index not in the valid range")
1✔
671
        partition_path = self.wip_partition_path(partition_index)
1✔
672
        partition_path.mkdir(exist_ok=True)
1✔
673
        logger.info(f"Encoding partition {partition_index} to {partition_path}")
1✔
674

675
        self.encode_id_partition(partition_index)
1✔
676
        self.encode_filters_partition(partition_index)
1✔
677
        self.encode_contig_partition(partition_index)
1✔
678
        self.encode_alleles_partition(partition_index)
1✔
679
        for array_spec in self.schema.fields:
1✔
680
            if array_spec.vcf_field is not None:
1✔
681
                self.encode_array_partition(array_spec, partition_index)
1✔
682
        if self.has_genotypes():
1✔
683
            self.encode_genotypes_partition(partition_index)
1✔
684

685
        final_path = self.partition_path(partition_index)
1✔
686
        logger.info(f"Finalising {partition_index} at {final_path}")
1✔
687
        if final_path.exists():
1✔
688
            logger.warning(f"Removing existing partition at {final_path}")
1✔
689
            shutil.rmtree(final_path)
1✔
690
        os.rename(partition_path, final_path)
1✔
691

692
    def init_partition_array(self, partition_index, name):
1✔
693
        # Create an empty array like the definition
694
        src = self.arrays_path / name
1✔
695
        # Overwrite any existing WIP files
696
        wip_path = self.wip_partition_array_path(partition_index, name)
1✔
697
        shutil.copytree(src, wip_path, dirs_exist_ok=True)
1✔
698
        array = zarr.open_array(store=wip_path, mode="a")
1✔
699
        logger.debug(f"Opened empty array {array.name} <{array.dtype}> @ {wip_path}")
1✔
700
        return array
1✔
701

702
    def finalise_partition_array(self, partition_index, name):
1✔
703
        logger.debug(f"Encoded {name} partition {partition_index}")
1✔
704

705
    def encode_array_partition(self, array_spec, partition_index):
1✔
706
        array = self.init_partition_array(partition_index, array_spec.name)
1✔
707

708
        partition = self.metadata.partitions[partition_index]
1✔
709
        ba = core.BufferedArray(array, partition.start)
1✔
710
        source_field = self.icf.fields[array_spec.vcf_field]
1✔
711
        sanitiser = source_field.sanitiser_factory(ba.buff.shape)
1✔
712

713
        for value in source_field.iter_values(partition.start, partition.stop):
1✔
714
            # We write directly into the buffer in the sanitiser function
715
            # to make it easier to reason about dimension padding
716
            j = ba.next_buffer_row()
1✔
717
            sanitiser(ba.buff, j, value)
1✔
718
        ba.flush()
1✔
719
        self.finalise_partition_array(partition_index, array_spec.name)
1✔
720

721
    def encode_genotypes_partition(self, partition_index):
1✔
722
        gt_array = self.init_partition_array(partition_index, "call_genotype")
1✔
723
        gt_mask_array = self.init_partition_array(partition_index, "call_genotype_mask")
1✔
724
        gt_phased_array = self.init_partition_array(
1✔
725
            partition_index, "call_genotype_phased"
726
        )
727

728
        partition = self.metadata.partitions[partition_index]
1✔
729
        gt = core.BufferedArray(gt_array, partition.start)
1✔
730
        gt_mask = core.BufferedArray(gt_mask_array, partition.start)
1✔
731
        gt_phased = core.BufferedArray(gt_phased_array, partition.start)
1✔
732

733
        source_field = self.icf.fields["FORMAT/GT"]
1✔
734
        for value in source_field.iter_values(partition.start, partition.stop):
1✔
735
            j = gt.next_buffer_row()
1✔
736
            icf.sanitise_value_int_2d(
1✔
737
                gt.buff, j, value[:, :-1] if value is not None else None
738
            )
739
            j = gt_phased.next_buffer_row()
1✔
740
            icf.sanitise_value_int_1d(
1✔
741
                gt_phased.buff, j, value[:, -1] if value is not None else None
742
            )
743
            # TODO check is this the correct semantics when we are padding
744
            # with mixed ploidies?
745
            j = gt_mask.next_buffer_row()
1✔
746
            gt_mask.buff[j] = gt.buff[j] < 0
1✔
747
        gt.flush()
1✔
748
        gt_phased.flush()
1✔
749
        gt_mask.flush()
1✔
750

751
        self.finalise_partition_array(partition_index, "call_genotype")
1✔
752
        self.finalise_partition_array(partition_index, "call_genotype_mask")
1✔
753
        self.finalise_partition_array(partition_index, "call_genotype_phased")
1✔
754

755
    def encode_alleles_partition(self, partition_index):
1✔
756
        array_name = "variant_allele"
1✔
757
        alleles_array = self.init_partition_array(partition_index, array_name)
1✔
758
        partition = self.metadata.partitions[partition_index]
1✔
759
        alleles = core.BufferedArray(alleles_array, partition.start)
1✔
760
        ref_field = self.icf.fields["REF"]
1✔
761
        alt_field = self.icf.fields["ALT"]
1✔
762

763
        for ref, alt in zip(
1✔
764
            ref_field.iter_values(partition.start, partition.stop),
765
            alt_field.iter_values(partition.start, partition.stop),
766
        ):
767
            j = alleles.next_buffer_row()
1✔
768
            alleles.buff[j, :] = constants.STR_FILL
1✔
769
            alleles.buff[j, 0] = ref[0]
1✔
770
            alleles.buff[j, 1 : 1 + len(alt)] = alt
1✔
771
        alleles.flush()
1✔
772

773
        self.finalise_partition_array(partition_index, array_name)
1✔
774

775
    def encode_id_partition(self, partition_index):
1✔
776
        vid_array = self.init_partition_array(partition_index, "variant_id")
1✔
777
        vid_mask_array = self.init_partition_array(partition_index, "variant_id_mask")
1✔
778
        partition = self.metadata.partitions[partition_index]
1✔
779
        vid = core.BufferedArray(vid_array, partition.start)
1✔
780
        vid_mask = core.BufferedArray(vid_mask_array, partition.start)
1✔
781
        field = self.icf.fields["ID"]
1✔
782

783
        for value in field.iter_values(partition.start, partition.stop):
1✔
784
            j = vid.next_buffer_row()
1✔
785
            k = vid_mask.next_buffer_row()
1✔
786
            assert j == k
1✔
787
            if value is not None:
1✔
788
                vid.buff[j] = value[0]
1✔
789
                vid_mask.buff[j] = False
1✔
790
            else:
791
                vid.buff[j] = constants.STR_MISSING
1✔
792
                vid_mask.buff[j] = True
1✔
793
        vid.flush()
1✔
794
        vid_mask.flush()
1✔
795

796
        self.finalise_partition_array(partition_index, "variant_id")
1✔
797
        self.finalise_partition_array(partition_index, "variant_id_mask")
1✔
798

799
    def encode_filters_partition(self, partition_index):
1✔
800
        lookup = {filt.id: index for index, filt in enumerate(self.schema.filters)}
1✔
801
        array_name = "variant_filter"
1✔
802
        array = self.init_partition_array(partition_index, array_name)
1✔
803
        partition = self.metadata.partitions[partition_index]
1✔
804
        var_filter = core.BufferedArray(array, partition.start)
1✔
805

806
        field = self.icf.fields["FILTERS"]
1✔
807
        for value in field.iter_values(partition.start, partition.stop):
1✔
808
            j = var_filter.next_buffer_row()
1✔
809
            var_filter.buff[j] = False
1✔
810
            for f in value:
1✔
811
                try:
1✔
812
                    var_filter.buff[j, lookup[f]] = True
1✔
UNCOV
813
                except KeyError:
×
UNCOV
814
                    raise ValueError(
×
815
                        f"Filter '{f}' was not defined in the header."
816
                    ) from None
817
        var_filter.flush()
1✔
818

819
        self.finalise_partition_array(partition_index, array_name)
1✔
820

821
    def encode_contig_partition(self, partition_index):
1✔
822
        lookup = {contig.id: index for index, contig in enumerate(self.schema.contigs)}
1✔
823
        array_name = "variant_contig"
1✔
824
        array = self.init_partition_array(partition_index, array_name)
1✔
825
        partition = self.metadata.partitions[partition_index]
1✔
826
        contig = core.BufferedArray(array, partition.start)
1✔
827
        field = self.icf.fields["CHROM"]
1✔
828

829
        for value in field.iter_values(partition.start, partition.stop):
1✔
830
            j = contig.next_buffer_row()
1✔
831
            # Note: because we are using the indexes to define the lookups
832
            # and we always have an index, it seems that we the contig lookup
833
            # will always succeed. However, if anyone ever does hit a KeyError
834
            # here, please do open an issue with a reproducible example!
835
            contig.buff[j] = lookup[value[0]]
1✔
836
        contig.flush()
1✔
837

838
        self.finalise_partition_array(partition_index, array_name)
1✔
839

840
    #######################
841
    # finalise
842
    #######################
843

844
    def finalise_array(self, name):
1✔
845
        logger.info(f"Finalising {name}")
1✔
846
        final_path = self.path / name
1✔
847
        if final_path.exists():
1✔
848
            # NEEDS TEST
UNCOV
849
            raise ValueError(f"Array {name} already exists")
×
850
        for partition in range(self.num_partitions):
1✔
851
            # Move all the files in partition dir to dest dir
852
            src = self.partition_array_path(partition, name)
1✔
853
            if not src.exists():
1✔
854
                # Needs test
UNCOV
855
                raise ValueError(f"Partition {partition} of {name} does not exist")
×
856
            dest = self.arrays_path / name
1✔
857
            # This is Zarr v2 specific. Chunks in v3 with start with "c" prefix.
858
            chunk_files = [
1✔
859
                path for path in src.iterdir() if not path.name.startswith(".")
860
            ]
861
            # TODO check for a count of then number of files. If we require a
862
            # dimension_separator of "/" then we could make stronger assertions
863
            # here, as we'd always have num_variant_chunks
864
            logger.debug(
1✔
865
                f"Moving {len(chunk_files)} chunks for {name} partition {partition}"
866
            )
867
            for chunk_file in chunk_files:
1✔
868
                os.rename(chunk_file, dest / chunk_file.name)
1✔
869
        # Finally, once all the chunks have moved into the arrays dir,
870
        # we move it out of wip
871
        os.rename(self.arrays_path / name, self.path / name)
1✔
872
        core.update_progress(1)
1✔
873

874
    def finalise(self, show_progress=False):
1✔
875
        self.load_metadata()
1✔
876

877
        logger.info(f"Scanning {self.num_partitions} partitions")
1✔
878
        missing = []
1✔
879
        # TODO may need a progress bar here
880
        for partition_id in range(self.num_partitions):
1✔
881
            if not self.partition_path(partition_id).exists():
1✔
882
                missing.append(partition_id)
1✔
883
        if len(missing) > 0:
1✔
884
            raise FileNotFoundError(f"Partitions not encoded: {missing}")
1✔
885

886
        progress_config = core.ProgressConfig(
1✔
887
            total=len(self.schema.fields),
888
            title="Finalise",
889
            units="array",
890
            show=show_progress,
891
        )
892
        # NOTE: it's not clear that adding more workers will make this quicker,
893
        # as it's just going to be causing contention on the file system.
894
        # Something to check empirically in some deployments.
895
        # FIXME we're just using worker_processes=0 here to hook into the
896
        # SynchronousExecutor which is intended for testing purposes so
897
        # that we get test coverage. Should fix this either by allowing
898
        # for multiple workers, or making a standard wrapper for tqdm
899
        # that allows us to have a consistent look and feel.
900
        with core.ParallelWorkManager(0, progress_config) as pwm:
1✔
901
            for field in self.schema.fields:
1✔
902
                pwm.submit(self.finalise_array, field.name)
1✔
903
        logger.debug(f"Removing {self.wip_path}")
1✔
904
        shutil.rmtree(self.wip_path)
1✔
905
        logger.info("Consolidating Zarr metadata")
1✔
906
        zarr.consolidate_metadata(self.path)
1✔
907

908
    #######################
909
    # index
910
    #######################
911

912
    def create_index(self):
1✔
913
        """Create an index to support efficient region queries."""
914

915
        root = zarr.open_group(store=self.path, mode="r+")
1✔
916

917
        contig = root["variant_contig"]
1✔
918
        pos = root["variant_position"]
1✔
919
        length = root["variant_length"]
1✔
920

921
        assert contig.cdata_shape == pos.cdata_shape
1✔
922

923
        index = []
1✔
924

925
        logger.info("Creating region index")
1✔
926
        for v_chunk in range(pos.cdata_shape[0]):
1✔
927
            c = contig.blocks[v_chunk]
1✔
928
            p = pos.blocks[v_chunk]
1✔
929
            e = p + length.blocks[v_chunk] - 1
1✔
930

931
            # create a row for each contig in the chunk
932
            d = np.diff(c, append=-1)
1✔
933
            c_start_idx = 0
1✔
934
            for c_end_idx in np.nonzero(d)[0]:
1✔
935
                assert c[c_start_idx] == c[c_end_idx]
1✔
936
                index.append(
1✔
937
                    (
938
                        v_chunk,  # chunk index
939
                        c[c_start_idx],  # contig ID
940
                        p[c_start_idx],  # start
941
                        p[c_end_idx],  # end
942
                        np.max(e[c_start_idx : c_end_idx + 1]),  # max end
943
                        c_end_idx - c_start_idx + 1,  # num records
944
                    )
945
                )
946
                c_start_idx = c_end_idx + 1
1✔
947

948
        index = np.array(index, dtype=np.int32)
1✔
949
        array = root.array(
1✔
950
            "region_index",
951
            data=index,
952
            shape=index.shape,
953
            dtype=index.dtype,
954
            compressor=numcodecs.Blosc("zstd", clevel=9, shuffle=0),
955
            dimension_separator=self.metadata.dimension_separator,
956
        )
957
        array.attrs["_ARRAY_DIMENSIONS"] = [
1✔
958
            "region_index_values",
959
            "region_index_fields",
960
        ]
961

962
        logger.info("Consolidating Zarr metadata")
1✔
963
        zarr.consolidate_metadata(self.path)
1✔
964

965
    ######################
966
    # encode_all_partitions
967
    ######################
968

969
    def get_max_encoding_memory(self):
1✔
970
        """
971
        Return the approximate maximum memory used to encode a variant chunk.
972
        """
973
        max_encoding_mem = 0
1✔
974
        for array_spec in self.schema.fields:
1✔
975
            max_encoding_mem = max(max_encoding_mem, array_spec.variant_chunk_nbytes)
1✔
976
        gt_mem = 0
1✔
977
        if self.has_genotypes:
1✔
978
            gt_mem = sum(
1✔
979
                field.variant_chunk_nbytes
980
                for field in self.schema.fields
981
                if field.name.startswith("call_genotype")
982
            )
983
        return max(max_encoding_mem, gt_mem)
1✔
984

985
    def encode_all_partitions(
1✔
986
        self, *, worker_processes=1, show_progress=False, max_memory=None
987
    ):
988
        max_memory = parse_max_memory(max_memory)
1✔
989
        self.load_metadata()
1✔
990
        num_partitions = self.num_partitions
1✔
991
        per_worker_memory = self.get_max_encoding_memory()
1✔
992
        logger.info(
1✔
993
            f"Encoding Zarr over {num_partitions} partitions with "
994
            f"{worker_processes} workers and {core.display_size(per_worker_memory)} "
995
            "per worker"
996
        )
997
        # Each partition requires per_worker_memory bytes, so to prevent more that
998
        # max_memory being used, we clamp the number of workers
999
        max_num_workers = max_memory // per_worker_memory
1✔
1000
        if max_num_workers < worker_processes:
1✔
1001
            logger.warning(
1✔
1002
                f"Limiting number of workers to {max_num_workers} to "
1003
                "keep within specified memory budget of "
1004
                f"{core.display_size(max_memory)}"
1005
            )
1006
        if max_num_workers <= 0:
1✔
1007
            raise ValueError(
1✔
1008
                f"Insufficient memory to encode a partition:"
1009
                f"{core.display_size(per_worker_memory)} > "
1010
                f"{core.display_size(max_memory)}"
1011
            )
1012
        num_workers = min(max_num_workers, worker_processes)
1✔
1013

1014
        total_bytes = 0
1✔
1015
        for array_spec in self.schema.fields:
1✔
1016
            # Open the array definition to get the total size
1017
            total_bytes += zarr.open(self.arrays_path / array_spec.name).nbytes
1✔
1018

1019
        progress_config = core.ProgressConfig(
1✔
1020
            total=total_bytes,
1021
            title="Encode",
1022
            units="B",
1023
            show=show_progress,
1024
        )
1025
        with core.ParallelWorkManager(num_workers, progress_config) as pwm:
1✔
1026
            for partition_index in range(num_partitions):
1✔
1027
                pwm.submit(self.encode_partition, partition_index)
1✔
1028

1029

1030
def mkschema(if_path, out, *, variants_chunk_size=None, samples_chunk_size=None):
1✔
1031
    store = icf.IntermediateColumnarFormat(if_path)
1✔
1032
    spec = VcfZarrSchema.generate(
1✔
1033
        store,
1034
        variants_chunk_size=variants_chunk_size,
1035
        samples_chunk_size=samples_chunk_size,
1036
    )
1037
    out.write(spec.asjson())
1✔
1038

1039

1040
def encode(
1✔
1041
    if_path,
1042
    zarr_path,
1043
    schema_path=None,
1044
    variants_chunk_size=None,
1045
    samples_chunk_size=None,
1046
    max_variant_chunks=None,
1047
    dimension_separator=None,
1048
    max_memory=None,
1049
    worker_processes=1,
1050
    show_progress=False,
1051
):
1052
    # Rough heuristic to split work up enough to keep utilisation high
1053
    target_num_partitions = max(1, worker_processes * 4)
1✔
1054
    encode_init(
1✔
1055
        if_path,
1056
        zarr_path,
1057
        target_num_partitions,
1058
        schema_path=schema_path,
1059
        variants_chunk_size=variants_chunk_size,
1060
        samples_chunk_size=samples_chunk_size,
1061
        max_variant_chunks=max_variant_chunks,
1062
        dimension_separator=dimension_separator,
1063
    )
1064
    vzw = VcfZarrWriter(zarr_path)
1✔
1065
    vzw.encode_all_partitions(
1✔
1066
        worker_processes=worker_processes,
1067
        show_progress=show_progress,
1068
        max_memory=max_memory,
1069
    )
1070
    vzw.finalise(show_progress)
1✔
1071
    vzw.create_index()
1✔
1072

1073

1074
def encode_init(
1✔
1075
    icf_path,
1076
    zarr_path,
1077
    target_num_partitions,
1078
    *,
1079
    schema_path=None,
1080
    variants_chunk_size=None,
1081
    samples_chunk_size=None,
1082
    max_variant_chunks=None,
1083
    dimension_separator=None,
1084
    max_memory=None,
1085
    worker_processes=1,
1086
    show_progress=False,
1087
):
1088
    icf_store = icf.IntermediateColumnarFormat(icf_path)
1✔
1089
    if schema_path is None:
1✔
1090
        schema = VcfZarrSchema.generate(
1✔
1091
            icf_store,
1092
            variants_chunk_size=variants_chunk_size,
1093
            samples_chunk_size=samples_chunk_size,
1094
        )
1095
    else:
1096
        logger.info(f"Reading schema from {schema_path}")
1✔
1097
        if variants_chunk_size is not None or samples_chunk_size is not None:
1✔
UNCOV
1098
            raise ValueError(
×
1099
                "Cannot specify schema along with chunk sizes"
1100
            )  # NEEDS TEST
1101
        with open(schema_path) as f:
1✔
1102
            schema = VcfZarrSchema.fromjson(f.read())
1✔
1103
    zarr_path = pathlib.Path(zarr_path)
1✔
1104
    vzw = VcfZarrWriter(zarr_path)
1✔
1105
    return vzw.init(
1✔
1106
        icf_store,
1107
        target_num_partitions=target_num_partitions,
1108
        schema=schema,
1109
        dimension_separator=dimension_separator,
1110
        max_variant_chunks=max_variant_chunks,
1111
    )
1112

1113

1114
def encode_partition(zarr_path, partition):
1✔
1115
    writer = VcfZarrWriter(zarr_path)
1✔
1116
    writer.encode_partition(partition)
1✔
1117

1118

1119
def encode_finalise(zarr_path, show_progress=False):
1✔
1120
    writer = VcfZarrWriter(zarr_path)
1✔
1121
    writer.finalise(show_progress=show_progress)
1✔
1122

1123

1124
def convert(
1✔
1125
    vcfs,
1126
    out_path,
1127
    *,
1128
    variants_chunk_size=None,
1129
    samples_chunk_size=None,
1130
    worker_processes=1,
1131
    local_alleles=None,
1132
    show_progress=False,
1133
    icf_path=None,
1134
):
1135
    if icf_path is None:
1✔
1136
        cm = temp_icf_path(prefix="vcf2zarr")
1✔
1137
    else:
1138
        cm = contextlib.nullcontext(icf_path)
1✔
1139

1140
    with cm as icf_path:
1✔
1141
        icf.explode(
1✔
1142
            icf_path,
1143
            vcfs,
1144
            worker_processes=worker_processes,
1145
            show_progress=show_progress,
1146
            local_alleles=local_alleles,
1147
        )
1148
        encode(
1✔
1149
            icf_path,
1150
            out_path,
1151
            variants_chunk_size=variants_chunk_size,
1152
            samples_chunk_size=samples_chunk_size,
1153
            worker_processes=worker_processes,
1154
            show_progress=show_progress,
1155
        )
1156

1157

1158
@contextlib.contextmanager
1✔
1159
def temp_icf_path(prefix=None):
1✔
1160
    with tempfile.TemporaryDirectory(prefix=prefix) as tmp:
1✔
1161
        yield pathlib.Path(tmp) / "icf"
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc