• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sgkit-dev / bio2zarr / 18940786519

30 Oct 2025 12:34PM UTC coverage: 98.361% (+0.07%) from 98.292%
18940786519

Pull #422

github

web-flow
Merge b33cd4646 into 3f955cef0
Pull Request #422: Update intro.md with VCF Zarr conversion info

2880 of 2928 relevant lines covered (98.36%)

3.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.65
/bio2zarr/vcz.py
1
import abc
4✔
2
import dataclasses
4✔
3
import json
4✔
4
import logging
4✔
5
import os
4✔
6
import pathlib
4✔
7
import shutil
4✔
8

9
import numcodecs
4✔
10
import numpy as np
4✔
11
import zarr
4✔
12

13
from bio2zarr import constants, core, provenance, zarr_utils
4✔
14

15
logger = logging.getLogger(__name__)
4✔
16

17
ZARR_SCHEMA_FORMAT_VERSION = "0.6"
4✔
18
DEFAULT_VARIANT_CHUNK_SIZE = 1000
4✔
19
DEFAULT_SAMPLE_CHUNK_SIZE = 10_000
4✔
20
DEFAULT_ZARR_COMPRESSOR = numcodecs.Blosc(cname="zstd", clevel=7)
4✔
21
DEFAULT_ZARR_COMPRESSOR_GENOTYPES = numcodecs.Blosc(
4✔
22
    cname="zstd", clevel=7, shuffle=numcodecs.Blosc.BITSHUFFLE
23
)
24
DEFAULT_ZARR_COMPRESSOR_BOOL = numcodecs.Blosc(
4✔
25
    cname="zstd", clevel=7, shuffle=numcodecs.Blosc.BITSHUFFLE
26
)
27

28
_fixed_field_descriptions = {
4✔
29
    "variant_contig": "An identifier from the reference genome or an angle-bracketed ID"
30
    " string pointing to a contig in the assembly file",
31
    "variant_position": "The reference position",
32
    "variant_length": "The length of the variant measured in bases",
33
    "variant_id": "List of unique identifiers where applicable",
34
    "variant_allele": "List of the reference and alternate alleles",
35
    "variant_quality": "Phred-scaled quality score",
36
    "variant_filter": "Filter status of the variant",
37
}
38

39

40
@dataclasses.dataclass
4✔
41
class VariantData:
4✔
42
    """Represents variant data returned by iter_alleles_and_genotypes."""
43

44
    variant_length: int
4✔
45
    alleles: np.ndarray
4✔
46
    genotypes: np.ndarray
4✔
47
    phased: np.ndarray
4✔
48

49

50
class Source(abc.ABC):
4✔
51
    @property
4✔
52
    @abc.abstractmethod
4✔
53
    def path(self):
4✔
54
        pass
×
55

56
    @property
4✔
57
    @abc.abstractmethod
4✔
58
    def num_records(self):
4✔
59
        pass
×
60

61
    @property
4✔
62
    @abc.abstractmethod
4✔
63
    def num_samples(self):
4✔
64
        pass
×
65

66
    @property
4✔
67
    @abc.abstractmethod
4✔
68
    def samples(self):
4✔
69
        pass
×
70

71
    @property
4✔
72
    def contigs(self):
4✔
73
        return None
×
74

75
    @property
4✔
76
    def filters(self):
4✔
77
        return None
4✔
78

79
    @property
4✔
80
    def root_attrs(self):
4✔
81
        return {}
4✔
82

83
    @abc.abstractmethod
4✔
84
    def iter_alleles_and_genotypes(self, start, stop, shape, num_alleles):
4✔
85
        pass
×
86

87
    def iter_id(self, start, stop):
4✔
88
        return
×
89

90
    def iter_contig(self, start, stop):
4✔
91
        return
×
92

93
    @abc.abstractmethod
4✔
94
    def iter_field(self, field_name, shape, start, stop):
4✔
95
        pass
×
96

97
    @abc.abstractmethod
4✔
98
    def generate_schema(self, variants_chunk_size, samples_chunk_size, local_alleles):
4✔
99
        pass
×
100

101

102
@dataclasses.dataclass
4✔
103
class VcfZarrDimension:
4✔
104
    size: int
4✔
105
    chunk_size: int
4✔
106

107
    def asdict(self):
4✔
108
        return dataclasses.asdict(self)
4✔
109

110
    @classmethod
4✔
111
    def fromdict(cls, d):
4✔
112
        return cls(**d)
4✔
113

114
    @classmethod
4✔
115
    def unchunked(cls, size):
4✔
116
        return cls(size, max(size, 1))
4✔
117

118

119
def standard_dimensions(
4✔
120
    *,
121
    variants_size,
122
    samples_size,
123
    variants_chunk_size=None,
124
    samples_chunk_size=None,
125
    alleles_size=None,
126
    filters_size=None,
127
    ploidy_size=None,
128
    genotypes_size=None,
129
):
130
    """
131
    Returns a dictionary mapping dimension names to definition for the standard
132
    fields in a VCF.
133
    """
134
    if variants_chunk_size is None:
4✔
135
        variants_chunk_size = max(1, min(variants_size, DEFAULT_VARIANT_CHUNK_SIZE))
4✔
136
    if samples_chunk_size is None:
4✔
137
        samples_chunk_size = max(1, min(samples_size, DEFAULT_SAMPLE_CHUNK_SIZE))
4✔
138

139
    dimensions = {
4✔
140
        "variants": VcfZarrDimension(variants_size, variants_chunk_size),
141
        "samples": VcfZarrDimension(samples_size, samples_chunk_size),
142
    }
143

144
    if alleles_size is not None:
4✔
145
        dimensions["alleles"] = VcfZarrDimension.unchunked(alleles_size)
4✔
146
        if alleles_size > 1:
4✔
147
            dimensions["alt_alleles"] = VcfZarrDimension.unchunked(alleles_size - 1)
4✔
148

149
    if filters_size is not None:
4✔
150
        dimensions["filters"] = VcfZarrDimension.unchunked(filters_size)
4✔
151

152
    if ploidy_size is not None:
4✔
153
        dimensions["ploidy"] = VcfZarrDimension.unchunked(ploidy_size)
4✔
154

155
    if genotypes_size is not None:
4✔
156
        dimensions["genotypes"] = VcfZarrDimension.unchunked(genotypes_size)
4✔
157

158
    return dimensions
4✔
159

160

161
@dataclasses.dataclass
4✔
162
class ZarrArraySpec:
4✔
163
    name: str
4✔
164
    dtype: str
4✔
165
    dimensions: tuple
4✔
166
    description: str
4✔
167
    compressor: dict = None
4✔
168
    filters: list = None
4✔
169
    source: str = None
4✔
170

171
    def __post_init__(self):
4✔
172
        if self.name in _fixed_field_descriptions:
4✔
173
            self.description = self.description or _fixed_field_descriptions[self.name]
4✔
174

175
        self.dimensions = tuple(self.dimensions)
4✔
176
        self.filters = tuple(self.filters) if self.filters is not None else None
4✔
177

178
    def get_shape(self, schema):
4✔
179
        return schema.get_shape(self.dimensions)
4✔
180

181
    def get_chunks(self, schema):
4✔
182
        return schema.get_chunks(self.dimensions)
4✔
183

184
    def get_chunk_nbytes(self, schema):
4✔
185
        element_size = np.dtype(self.dtype).itemsize
4✔
186
        chunks = self.get_chunks(schema)
4✔
187
        shape = self.get_shape(schema)
4✔
188

189
        # Calculate actual chunk size accounting for dimension limits
190
        items = 1
4✔
191
        for i, chunk_size in enumerate(chunks):
4✔
192
            items *= min(chunk_size, shape[i])
4✔
193

194
        # Include sizes for extra dimensions (if any)
195
        if len(shape) > len(chunks):
4✔
196
            for size in shape[len(chunks) :]:
×
197
                items *= size
×
198

199
        return element_size * items
4✔
200

201
    @staticmethod
4✔
202
    def from_field(
4✔
203
        vcf_field,
204
        schema,
205
        *,
206
        array_name=None,
207
        compressor=None,
208
        filters=None,
209
    ):
210
        prefix = "variant_"
4✔
211
        dimensions = ["variants"]
4✔
212
        if vcf_field.category == "FORMAT":
4✔
213
            prefix = "call_"
4✔
214
            dimensions.append("samples")
4✔
215
        if array_name is None:
4✔
216
            array_name = prefix + vcf_field.name
4✔
217

218
        max_number = vcf_field.max_number
4✔
219
        if vcf_field.vcf_number == "R":
4✔
220
            max_alleles = schema.dimensions["alleles"].size
4✔
221
            if max_number > max_alleles:
4✔
222
                raise ValueError(
4✔
223
                    f"Max number of values {max_number} exceeds max alleles "
224
                    f"{max_alleles} for {vcf_field.full_name}"
225
                )
226
            if max_alleles > 0:
4✔
227
                dimensions.append("alleles")
4✔
228
        elif vcf_field.vcf_number == "A":
4✔
229
            max_alt_alleles = schema.dimensions["alt_alleles"].size
4✔
230
            if max_number > max_alt_alleles:
4✔
231
                raise ValueError(
4✔
232
                    f"Max number of values {max_number} exceeds max alt alleles "
233
                    f"{max_alt_alleles} for {vcf_field.full_name}"
234
                )
235
            if max_alt_alleles > 0:
4✔
236
                dimensions.append("alt_alleles")
4✔
237
        elif vcf_field.vcf_number == "G":
4✔
238
            max_genotypes = schema.dimensions["genotypes"].size
4✔
239
            if max_number > max_genotypes:
4✔
240
                raise ValueError(
4✔
241
                    f"Max number of values {max_number} exceeds max genotypes "
242
                    f"{max_genotypes} for {vcf_field.full_name}"
243
                )
244
            if max_genotypes > 0:
4✔
245
                dimensions.append("genotypes")
4✔
246
        elif max_number > 1 or vcf_field.full_name == "FORMAT/LAA":
4✔
247
            dimensions.append(f"{vcf_field.category}_{vcf_field.name}_dim")
4✔
248
        if dimensions[-1] not in schema.dimensions:
4✔
249
            schema.dimensions[dimensions[-1]] = VcfZarrDimension.unchunked(
4✔
250
                vcf_field.max_number
251
            )
252

253
        return ZarrArraySpec(
4✔
254
            source=vcf_field.full_name,
255
            name=array_name,
256
            dtype=vcf_field.smallest_dtype(),
257
            dimensions=dimensions,
258
            description=vcf_field.description,
259
            compressor=compressor,
260
            filters=filters,
261
        )
262

263
    def chunk_nbytes(self, schema):
4✔
264
        """
265
        Returns the nbytes for a single chunk in this array.
266
        """
267
        items = 1
×
268
        dim = 0
×
269
        for chunk_size in self.get_chunks(schema):
×
270
            size = min(chunk_size, self.get_shape(schema)[dim])
×
271
            items *= size
×
272
            dim += 1
×
273
        # Include sizes for extra dimensions.
274
        for size in self.get_shape(schema)[dim:]:
×
275
            items *= size
×
276
        dt = np.dtype(self.dtype)
×
277
        return items * dt.itemsize
×
278

279
    def variant_chunk_nbytes(self, schema):
4✔
280
        """
281
        Returns the nbytes for a single variant chunk of this array.
282
        """
283
        chunk_items = self.get_chunks(schema)[0]
4✔
284
        for size in self.get_shape(schema)[1:]:
4✔
285
            chunk_items *= size
4✔
286
        dt = np.dtype(self.dtype)
4✔
287
        if dt.kind == "O" and "samples" in self.dimensions:
4✔
288
            logger.warning(
4✔
289
                f"Field {self.name} is a string; max memory usage may "
290
                "be a significant underestimate"
291
            )
292
        return chunk_items * dt.itemsize
4✔
293

294

295
@dataclasses.dataclass
4✔
296
class Contig:
4✔
297
    id: str
4✔
298
    length: int = None
4✔
299

300

301
@dataclasses.dataclass
4✔
302
class Sample:
4✔
303
    id: str
4✔
304

305

306
@dataclasses.dataclass
4✔
307
class Filter:
4✔
308
    id: str
4✔
309
    description: str = ""
4✔
310

311

312
@dataclasses.dataclass
4✔
313
class VcfZarrSchema(core.JsonDataclass):
4✔
314
    format_version: str
4✔
315
    dimensions: dict
4✔
316
    fields: list
4✔
317
    defaults: dict
4✔
318

319
    def __init__(
4✔
320
        self,
321
        format_version: str,
322
        fields: list,
323
        dimensions: dict,
324
        defaults: dict = None,
325
    ):
326
        self.format_version = format_version
4✔
327
        self.fields = fields
4✔
328
        defaults = defaults.copy() if defaults is not None else {}
4✔
329
        if defaults.get("compressor", None) is None:
4✔
330
            defaults["compressor"] = DEFAULT_ZARR_COMPRESSOR.get_config()
4✔
331
        if defaults.get("filters", None) is None:
4✔
332
            defaults["filters"] = []
4✔
333
        self.defaults = defaults
4✔
334
        self.dimensions = dimensions
4✔
335

336
    def get_shape(self, dimensions):
4✔
337
        return [self.dimensions[dim].size for dim in dimensions]
4✔
338

339
    def get_chunks(self, dimensions):
4✔
340
        return [self.dimensions[dim].chunk_size for dim in dimensions]
4✔
341

342
    def validate(self):
4✔
343
        """
344
        Checks that the schema is well-formed and within required limits.
345
        """
346
        for field in self.fields:
4✔
347
            for dim in field.dimensions:
4✔
348
                if dim not in self.dimensions:
4✔
349
                    raise ValueError(
×
350
                        f"Dimension '{dim}' used in field '{field.name}' is "
351
                        "not defined in the schema"
352
                    )
353

354
            chunk_nbytes = field.get_chunk_nbytes(self)
4✔
355
            # This is the Blosc max buffer size
356
            if chunk_nbytes > 2147483647:
4✔
357
                raise ValueError(
4✔
358
                    f"Field {field.name} chunks are too large "
359
                    f"({chunk_nbytes} > 2**31 - 1 bytes). "
360
                    "Either generate a schema and drop this field (if you don't "
361
                    "need it) or reduce the variant or sample chunk sizes."
362
                )
363
            # TODO other checks? There must be lots of ways people could mess
364
            # up the schema leading to cryptic errors.
365

366
    def field_map(self):
4✔
367
        return {field.name: field for field in self.fields}
4✔
368

369
    @staticmethod
4✔
370
    def fromdict(d):
4✔
371
        if d["format_version"] != ZARR_SCHEMA_FORMAT_VERSION:
4✔
372
            raise ValueError(
4✔
373
                "Zarr schema format version mismatch: "
374
                f"{d['format_version']} != {ZARR_SCHEMA_FORMAT_VERSION}"
375
            )
376

377
        ret = VcfZarrSchema(**d)
4✔
378
        ret.fields = [ZarrArraySpec(**sd) for sd in d["fields"]]
4✔
379
        ret.dimensions = {
4✔
380
            k: VcfZarrDimension.fromdict(v) for k, v in d["dimensions"].items()
381
        }
382

383
        return ret
4✔
384

385
    @staticmethod
4✔
386
    def fromjson(s):
4✔
387
        return VcfZarrSchema.fromdict(json.loads(s))
4✔
388

389

390
def sanitise_int_array(value, ndmin, dtype):
4✔
391
    if isinstance(value, tuple):
4✔
392
        value = [
×
393
            constants.VCF_INT_MISSING if x is None else x for x in value
394
        ]  # NEEDS TEST
395
    value = np.array(value, ndmin=ndmin, copy=True)
4✔
396
    value[value == constants.VCF_INT_MISSING] = -1
4✔
397
    value[value == constants.VCF_INT_FILL] = -2
4✔
398
    # TODO watch out for clipping here!
399
    return value.astype(dtype)
4✔
400

401

402
def compute_la_field(genotypes):
4✔
403
    """
404
    Computes the value of the LA field for each sample given the genotypes
405
    for a variant. The LA field lists the unique alleles observed for
406
    each sample, including the REF.
407
    """
408
    v = 2**31 - 1
4✔
409
    if np.any(genotypes >= v):
4✔
410
        raise ValueError("Extreme allele value not supported")
4✔
411
    G = genotypes.astype(np.int32)
4✔
412
    if len(G) > 0:
4✔
413
        # Anything < 0 gets mapped to -2 (pad) in the output, which comes last.
414
        # So, to get this sorting correctly, we remap to the largest value for
415
        # sorting, then map back. We promote the genotypes up to 32 bit for convenience
416
        # here, assuming that we'll never have a allele of 2**31 - 1.
417
        assert np.all(G != v)
4✔
418
        G[G < 0] = v
4✔
419
        G.sort(axis=1)
4✔
420
        G[G[:, 0] == G[:, 1], 1] = -2
4✔
421
        # Equal values result in padding also
422
        G[G == v] = -2
4✔
423
    return G.astype(genotypes.dtype)
4✔
424

425

426
def compute_lad_field(ad, la):
4✔
427
    assert ad.shape[0] == la.shape[0]
4✔
428
    assert la.shape[1] == 2
4✔
429
    lad = np.full((ad.shape[0], 2), -2, dtype=ad.dtype)
4✔
430
    homs = np.where((la[:, 0] != -2) & (la[:, 1] == -2))
4✔
431
    lad[homs, 0] = ad[homs, la[homs, 0]]
4✔
432
    hets = np.where(la[:, 1] != -2)
4✔
433
    lad[hets, 0] = ad[hets, la[hets, 0]]
4✔
434
    lad[hets, 1] = ad[hets, la[hets, 1]]
4✔
435
    return lad
4✔
436

437

438
def pl_index(a, b):
4✔
439
    """
440
    Returns the PL index for alleles a and b.
441
    """
442
    return b * (b + 1) // 2 + a
4✔
443

444

445
def compute_lpl_field(pl, la):
4✔
446
    lpl = np.full((pl.shape[0], 3), -2, dtype=pl.dtype)
4✔
447

448
    homs = np.where((la[:, 0] != -2) & (la[:, 1] == -2))
4✔
449
    a = la[homs, 0]
4✔
450
    lpl[homs, 0] = pl[homs, pl_index(a, a)]
4✔
451

452
    hets = np.where(la[:, 1] != -2)[0]
4✔
453
    a = la[hets, 0]
4✔
454
    b = la[hets, 1]
4✔
455
    lpl[hets, 0] = pl[hets, pl_index(a, a)]
4✔
456
    lpl[hets, 1] = pl[hets, pl_index(a, b)]
4✔
457
    lpl[hets, 2] = pl[hets, pl_index(b, b)]
4✔
458

459
    return lpl
4✔
460

461

462
@dataclasses.dataclass
4✔
463
class LocalisableFieldDescriptor:
4✔
464
    array_name: str
4✔
465
    vcf_field: str
4✔
466
    sanitise: callable
4✔
467
    convert: callable
4✔
468

469

470
localisable_fields = [
4✔
471
    LocalisableFieldDescriptor(
472
        "call_LAD", "FORMAT/AD", sanitise_int_array, compute_lad_field
473
    ),
474
    LocalisableFieldDescriptor(
475
        "call_LPL", "FORMAT/PL", sanitise_int_array, compute_lpl_field
476
    ),
477
]
478

479

480
@dataclasses.dataclass
4✔
481
class VcfZarrPartition:
4✔
482
    start: int
4✔
483
    stop: int
4✔
484

485
    @staticmethod
4✔
486
    def generate_partitions(num_records, chunk_size, num_partitions, max_chunks=None):
4✔
487
        num_chunks = int(np.ceil(num_records / chunk_size))
4✔
488
        if max_chunks is not None:
4✔
489
            num_chunks = min(num_chunks, max_chunks)
4✔
490
        partitions = []
4✔
491
        splits = np.array_split(np.arange(num_chunks), min(num_partitions, num_chunks))
4✔
492
        for chunk_slice in splits:
4✔
493
            start_chunk = int(chunk_slice[0])
4✔
494
            stop_chunk = int(chunk_slice[-1]) + 1
4✔
495
            start_index = start_chunk * chunk_size
4✔
496
            stop_index = min(stop_chunk * chunk_size, num_records)
4✔
497
            partitions.append(VcfZarrPartition(start_index, stop_index))
4✔
498
        return partitions
4✔
499

500

501
VZW_METADATA_FORMAT_VERSION = "0.1"
4✔
502

503

504
@dataclasses.dataclass
4✔
505
class VcfZarrWriterMetadata(core.JsonDataclass):
4✔
506
    format_version: str
4✔
507
    source_path: str
4✔
508
    schema: VcfZarrSchema
4✔
509
    dimension_separator: str
4✔
510
    partitions: list
4✔
511
    provenance: dict
4✔
512

513
    @staticmethod
4✔
514
    def fromdict(d):
4✔
515
        if d["format_version"] != VZW_METADATA_FORMAT_VERSION:
4✔
516
            raise ValueError(
4✔
517
                "VcfZarrWriter format version mismatch: "
518
                f"{d['format_version']} != {VZW_METADATA_FORMAT_VERSION}"
519
            )
520
        ret = VcfZarrWriterMetadata(**d)
4✔
521
        ret.schema = VcfZarrSchema.fromdict(ret.schema)
4✔
522
        ret.partitions = [VcfZarrPartition(**p) for p in ret.partitions]
4✔
523
        return ret
4✔
524

525

526
@dataclasses.dataclass
4✔
527
class VcfZarrWriteSummary(core.JsonDataclass):
4✔
528
    num_partitions: int
4✔
529
    num_samples: int
4✔
530
    num_variants: int
4✔
531
    num_chunks: int
4✔
532
    max_encoding_memory: str
4✔
533

534

535
class VcfZarrWriter:
4✔
536
    def __init__(self, source_type, path):
4✔
537
        self.source_type = source_type
4✔
538
        self.path = pathlib.Path(path)
4✔
539
        self.wip_path = self.path / "wip"
4✔
540
        self.arrays_path = self.wip_path / "arrays"
4✔
541
        self.partitions_path = self.wip_path / "partitions"
4✔
542
        self.metadata = None
4✔
543
        self.source = None
4✔
544

545
    @property
4✔
546
    def schema(self):
4✔
547
        return self.metadata.schema
4✔
548

549
    @property
4✔
550
    def num_partitions(self):
4✔
551
        return len(self.metadata.partitions)
4✔
552

553
    def has_genotypes(self):
4✔
554
        for field in self.schema.fields:
4✔
555
            if field.name == "call_genotype":
4✔
556
                return True
4✔
557
        return False
4✔
558

559
    def has_local_alleles(self):
4✔
560
        for field in self.schema.fields:
4✔
561
            if field.name == "call_LA" and field.source is None:
4✔
562
                return True
4✔
563
        return False
4✔
564

565
    #######################
566
    # init
567
    #######################
568

569
    def init(
4✔
570
        self,
571
        source,
572
        *,
573
        target_num_partitions,
574
        schema,
575
        dimension_separator=None,
576
        max_variant_chunks=None,
577
    ):
578
        self.source = source
4✔
579
        if self.path.exists():
4✔
580
            raise ValueError("Zarr path already exists")  # NEEDS TEST
×
581
        schema.validate()
4✔
582
        partitions = VcfZarrPartition.generate_partitions(
4✔
583
            self.source.num_records,
584
            schema.get_chunks(["variants"])[0],
585
            target_num_partitions,
586
            max_chunks=max_variant_chunks,
587
        )
588
        # Default to using nested directories following the Zarr v3 default.
589
        # This seems to require version 2.17+ to work properly
590
        dimension_separator = (
4✔
591
            "/" if dimension_separator is None else dimension_separator
592
        )
593
        self.metadata = VcfZarrWriterMetadata(
4✔
594
            format_version=VZW_METADATA_FORMAT_VERSION,
595
            source_path=str(self.source.path),
596
            schema=schema,
597
            dimension_separator=dimension_separator,
598
            partitions=partitions,
599
            # Bare minimum here for provenance - see comments above
600
            provenance={"source": f"bio2zarr-{provenance.__version__}"},
601
        )
602

603
        self.path.mkdir()
4✔
604
        root = zarr.open(store=self.path, mode="a", **zarr_utils.ZARR_FORMAT_KWARGS)
4✔
605
        root.attrs.update(
4✔
606
            {
607
                "vcf_zarr_version": "0.4",
608
                "source": f"bio2zarr-{provenance.__version__}",
609
            }
610
        )
611
        root.attrs.update(self.source.root_attrs)
4✔
612

613
        # Doing this synchronously - this is fine surely
614
        self.encode_samples(root)
4✔
615
        if self.source.filters is not None:
4✔
616
            self.encode_filters(root)
4✔
617
        if self.source.contigs is not None:
4✔
618
            self.encode_contigs(root)
4✔
619

620
        self.wip_path.mkdir()
4✔
621
        self.arrays_path.mkdir()
4✔
622
        self.partitions_path.mkdir()
4✔
623
        root = zarr.open(
4✔
624
            store=self.arrays_path, mode="a", **zarr_utils.ZARR_FORMAT_KWARGS
625
        )
626

627
        total_chunks = 0
4✔
628
        for field in self.schema.fields:
4✔
629
            a = self.init_array(root, self.metadata.schema, field, partitions[-1].stop)
4✔
630
            total_chunks += a.nchunks
4✔
631

632
        logger.info("Writing WIP metadata")
4✔
633
        with open(self.wip_path / "metadata.json", "w") as f:
4✔
634
            json.dump(self.metadata.asdict(), f, indent=4)
4✔
635

636
        return VcfZarrWriteSummary(
4✔
637
            num_variants=self.source.num_records,
638
            num_samples=self.source.num_samples,
639
            num_partitions=self.num_partitions,
640
            num_chunks=total_chunks,
641
            max_encoding_memory=core.display_size(self.get_max_encoding_memory()),
642
        )
643

644
    def encode_samples(self, root):
4✔
645
        samples = self.source.samples
4✔
646
        array = root.array(
4✔
647
            "sample_id",
648
            data=[sample.id for sample in samples],
649
            shape=len(samples),
650
            dtype="str",
651
            compressor=DEFAULT_ZARR_COMPRESSOR,
652
            chunks=(self.schema.get_chunks(["samples"])[0],),
653
        )
654
        array.attrs["_ARRAY_DIMENSIONS"] = ["samples"]
4✔
655
        logger.debug("Samples done")
4✔
656

657
    def encode_contigs(self, root):
4✔
658
        contigs = self.source.contigs
4✔
659
        array = root.array(
4✔
660
            "contig_id",
661
            data=[contig.id for contig in contigs],
662
            shape=len(contigs),
663
            dtype="str",
664
            compressor=DEFAULT_ZARR_COMPRESSOR,
665
        )
666
        array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]
4✔
667
        if all(contig.length is not None for contig in contigs):
4✔
668
            array = root.array(
4✔
669
                "contig_length",
670
                data=[contig.length for contig in contigs],
671
                shape=len(contigs),
672
                dtype=np.int64,
673
                compressor=DEFAULT_ZARR_COMPRESSOR,
674
            )
675
            array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]
4✔
676

677
    def encode_filters(self, root):
4✔
678
        filters = self.source.filters
4✔
679
        array = root.array(
4✔
680
            "filter_id",
681
            data=[filt.id for filt in filters],
682
            shape=len(filters),
683
            dtype="str",
684
            compressor=DEFAULT_ZARR_COMPRESSOR,
685
        )
686
        array.attrs["_ARRAY_DIMENSIONS"] = ["filters"]
4✔
687
        array = root.array(
4✔
688
            "filter_description",
689
            data=[filt.description for filt in filters],
690
            shape=len(filters),
691
            dtype="str",
692
            compressor=DEFAULT_ZARR_COMPRESSOR,
693
        )
694
        array.attrs["_ARRAY_DIMENSIONS"] = ["filters"]
4✔
695

696
    def init_array(self, root, schema, array_spec, variants_dim_size):
4✔
697
        kwargs = dict(zarr_utils.ZARR_FORMAT_KWARGS)
4✔
698
        filters = (
4✔
699
            array_spec.filters
700
            if array_spec.filters is not None
701
            else schema.defaults["filters"]
702
        )
703
        filters = [numcodecs.get_codec(filt) for filt in filters]
4✔
704
        compressor = (
4✔
705
            array_spec.compressor
706
            if array_spec.compressor is not None
707
            else schema.defaults["compressor"]
708
        )
709
        compressor = numcodecs.get_codec(compressor)
4✔
710
        if array_spec.dtype == "O":
4✔
711
            if zarr_utils.zarr_v3():
4✔
712
                filters = [*list(filters), numcodecs.VLenUTF8()]
×
713
            else:
714
                kwargs["object_codec"] = numcodecs.VLenUTF8()
4✔
715

716
        if not zarr_utils.zarr_v3():
4✔
717
            kwargs["dimension_separator"] = self.metadata.dimension_separator
4✔
718

719
        shape = schema.get_shape(array_spec.dimensions)
4✔
720
        # Truncate the variants dimension if max_variant_chunks was specified
721
        shape[0] = variants_dim_size
4✔
722
        a = root.empty(
4✔
723
            name=array_spec.name,
724
            shape=shape,
725
            chunks=schema.get_chunks(array_spec.dimensions),
726
            dtype=array_spec.dtype,
727
            compressor=compressor,
728
            filters=filters,
729
            **kwargs,
730
        )
731
        a.attrs.update(
4✔
732
            {
733
                "description": array_spec.description,
734
                # Dimension names are part of the spec in Zarr v3
735
                "_ARRAY_DIMENSIONS": array_spec.dimensions,
736
            }
737
        )
738
        logger.debug(f"Initialised {a}")
4✔
739
        return a
4✔
740

741
    #######################
742
    # encode_partition
743
    #######################
744

745
    def load_metadata(self):
4✔
746
        if self.metadata is None:
4✔
747
            with open(self.wip_path / "metadata.json") as f:
4✔
748
                self.metadata = VcfZarrWriterMetadata.fromdict(json.load(f))
4✔
749
            self.source = self.source_type(self.metadata.source_path)
4✔
750

751
    def partition_path(self, partition_index):
4✔
752
        return self.partitions_path / f"p{partition_index}"
4✔
753

754
    def wip_partition_path(self, partition_index):
4✔
755
        return self.partitions_path / f"wip_p{partition_index}"
4✔
756

757
    def wip_partition_array_path(self, partition_index, name):
4✔
758
        return self.wip_partition_path(partition_index) / name
4✔
759

760
    def partition_array_path(self, partition_index, name):
4✔
761
        return self.partition_path(partition_index) / name
4✔
762

763
    def encode_partition(self, partition_index):
4✔
764
        self.load_metadata()
4✔
765
        if partition_index < 0 or partition_index >= self.num_partitions:
4✔
766
            raise ValueError("Partition index not in the valid range")
4✔
767
        partition_path = self.wip_partition_path(partition_index)
4✔
768
        partition_path.mkdir(exist_ok=True)
4✔
769
        logger.info(f"Encoding partition {partition_index} to {partition_path}")
4✔
770

771
        all_field_names = [field.name for field in self.schema.fields]
4✔
772
        if "variant_id" in all_field_names:
4✔
773
            self.encode_id_partition(partition_index)
4✔
774
        if "variant_filter" in all_field_names:
4✔
775
            self.encode_filters_partition(partition_index)
4✔
776
        if "variant_contig" in all_field_names:
4✔
777
            self.encode_contig_partition(partition_index)
4✔
778
        self.encode_alleles_and_genotypes_partition(partition_index)
4✔
779
        for array_spec in self.schema.fields:
4✔
780
            if array_spec.source is not None:
4✔
781
                self.encode_array_partition(array_spec, partition_index)
4✔
782
        if self.has_genotypes():
4✔
783
            self.encode_genotype_mask_partition(partition_index)
4✔
784
        if self.has_local_alleles():
4✔
785
            self.encode_local_alleles_partition(partition_index)
4✔
786
            self.encode_local_allele_fields_partition(partition_index)
4✔
787

788
        final_path = self.partition_path(partition_index)
4✔
789
        logger.info(f"Finalising {partition_index} at {final_path}")
4✔
790
        if final_path.exists():
4✔
791
            logger.warning(f"Removing existing partition at {final_path}")
4✔
792
            shutil.rmtree(final_path)
4✔
793
        os.rename(partition_path, final_path)
4✔
794

795
    def init_partition_array(self, partition_index, name):
4✔
796
        field_map = self.schema.field_map()
4✔
797
        array_spec = field_map[name]
4✔
798
        # Create an empty array like the definition
799
        src = self.arrays_path / array_spec.name
4✔
800
        # Overwrite any existing WIP files
801
        wip_path = self.wip_partition_array_path(partition_index, array_spec.name)
4✔
802
        shutil.copytree(src, wip_path, dirs_exist_ok=True)
4✔
803
        array = zarr.open_array(store=wip_path, mode="a")
4✔
804
        partition = self.metadata.partitions[partition_index]
4✔
805
        ba = core.BufferedArray(array, partition.start, name)
4✔
806
        logger.info(
4✔
807
            f"Start partition {partition_index} array {name} <{array.dtype}> "
808
            f"{array.shape} @ {wip_path}"
809
        )
810
        return ba
4✔
811

812
    def finalise_partition_array(self, partition_index, buffered_array):
4✔
813
        buffered_array.flush()
4✔
814
        logger.info(
4✔
815
            f"Completed partition {partition_index} array {buffered_array.name} "
816
            f"max_memory={core.display_size(buffered_array.max_buff_size)}"
817
        )
818

819
    def encode_array_partition(self, array_spec, partition_index):
4✔
820
        partition = self.metadata.partitions[partition_index]
4✔
821
        ba = self.init_partition_array(partition_index, array_spec.name)
4✔
822
        for value in self.source.iter_field(
4✔
823
            array_spec.source,
824
            ba.buff.shape[1:],
825
            partition.start,
826
            partition.stop,
827
        ):
828
            j = ba.next_buffer_row()
4✔
829
            ba.buff[j] = value
4✔
830

831
        self.finalise_partition_array(partition_index, ba)
4✔
832

833
    def encode_alleles_and_genotypes_partition(self, partition_index):
4✔
834
        partition = self.metadata.partitions[partition_index]
4✔
835
        alleles = self.init_partition_array(partition_index, "variant_allele")
4✔
836
        variant_lengths = self.init_partition_array(partition_index, "variant_length")
4✔
837
        has_gt = self.has_genotypes()
4✔
838
        shape = None
4✔
839
        if has_gt:
4✔
840
            gt = self.init_partition_array(partition_index, "call_genotype")
4✔
841
            gt_phased = self.init_partition_array(
4✔
842
                partition_index, "call_genotype_phased"
843
            )
844
            shape = gt.buff.shape[1:]
4✔
845

846
        for variant_data in self.source.iter_alleles_and_genotypes(
4✔
847
            partition.start, partition.stop, shape, alleles.array.shape[1]
848
        ):
849
            j_alleles = alleles.next_buffer_row()
4✔
850
            alleles.buff[j_alleles] = variant_data.alleles
4✔
851
            j_variant_length = variant_lengths.next_buffer_row()
4✔
852
            variant_lengths.buff[j_variant_length] = variant_data.variant_length
4✔
853
            if has_gt:
4✔
854
                j = gt.next_buffer_row()
4✔
855
                gt.buff[j] = variant_data.genotypes
4✔
856
                j_phased = gt_phased.next_buffer_row()
4✔
857
                gt_phased.buff[j_phased] = variant_data.phased
4✔
858

859
        self.finalise_partition_array(partition_index, alleles)
4✔
860
        self.finalise_partition_array(partition_index, variant_lengths)
4✔
861
        if has_gt:
4✔
862
            self.finalise_partition_array(partition_index, gt)
4✔
863
            self.finalise_partition_array(partition_index, gt_phased)
4✔
864

865
    def encode_genotype_mask_partition(self, partition_index):
4✔
866
        partition = self.metadata.partitions[partition_index]
4✔
867
        gt_mask = self.init_partition_array(partition_index, "call_genotype_mask")
4✔
868
        # Read back in the genotypes so we can compute the mask
869
        gt_array = zarr.open_array(
4✔
870
            store=self.wip_partition_array_path(partition_index, "call_genotype"),
871
            mode="r",
872
        )
873
        for genotypes in core.first_dim_slice_iter(
4✔
874
            gt_array, partition.start, partition.stop
875
        ):
876
            # TODO check is this the correct semantics when we are padding
877
            # with mixed ploidies?
878
            j = gt_mask.next_buffer_row()
4✔
879
            gt_mask.buff[j] = genotypes < 0
4✔
880
        self.finalise_partition_array(partition_index, gt_mask)
4✔
881

882
    def encode_local_alleles_partition(self, partition_index):
4✔
883
        partition = self.metadata.partitions[partition_index]
4✔
884
        call_LA = self.init_partition_array(partition_index, "call_LA")
4✔
885

886
        gt_array = zarr.open_array(
4✔
887
            store=self.wip_partition_array_path(partition_index, "call_genotype"),
888
            mode="r",
889
        )
890
        for genotypes in core.first_dim_slice_iter(
4✔
891
            gt_array, partition.start, partition.stop
892
        ):
893
            la = compute_la_field(genotypes)
4✔
894
            j = call_LA.next_buffer_row()
4✔
895
            call_LA.buff[j] = la
4✔
896
        self.finalise_partition_array(partition_index, call_LA)
4✔
897

898
    def encode_local_allele_fields_partition(self, partition_index):
4✔
899
        partition = self.metadata.partitions[partition_index]
4✔
900
        la_array = zarr.open_array(
4✔
901
            store=self.wip_partition_array_path(partition_index, "call_LA"),
902
            mode="r",
903
        )
904
        # We got through the localisable fields one-by-one so that we don't need to
905
        # keep several large arrays in memory at once for each partition.
906
        field_map = self.schema.field_map()
4✔
907
        for descriptor in localisable_fields:
4✔
908
            if descriptor.array_name not in field_map:
4✔
909
                continue
4✔
910
            assert field_map[descriptor.array_name].source is None
4✔
911

912
            buff = self.init_partition_array(partition_index, descriptor.array_name)
4✔
913
            source = self.source.fields[descriptor.vcf_field].iter_values(
4✔
914
                partition.start, partition.stop
915
            )
916
            for la in core.first_dim_slice_iter(
4✔
917
                la_array, partition.start, partition.stop
918
            ):
919
                raw_value = next(source)
4✔
920
                value = descriptor.sanitise(raw_value, 2, raw_value.dtype)
4✔
921
                j = buff.next_buffer_row()
4✔
922
                buff.buff[j] = descriptor.convert(value, la)
4✔
923
            self.finalise_partition_array(partition_index, buff)
4✔
924

925
    def encode_id_partition(self, partition_index):
4✔
926
        vid = self.init_partition_array(partition_index, "variant_id")
4✔
927
        vid_mask = self.init_partition_array(partition_index, "variant_id_mask")
4✔
928
        partition = self.metadata.partitions[partition_index]
4✔
929

930
        for value in self.source.iter_id(partition.start, partition.stop):
4✔
931
            j = vid.next_buffer_row()
4✔
932
            k = vid_mask.next_buffer_row()
4✔
933
            assert j == k
4✔
934
            if value is not None:
4✔
935
                vid.buff[j] = value
4✔
936
                vid_mask.buff[j] = False
4✔
937
            else:
938
                vid.buff[j] = constants.STR_MISSING
4✔
939
                vid_mask.buff[j] = True
4✔
940

941
        self.finalise_partition_array(partition_index, vid)
4✔
942
        self.finalise_partition_array(partition_index, vid_mask)
4✔
943

944
    def encode_filters_partition(self, partition_index):
4✔
945
        var_filter = self.init_partition_array(partition_index, "variant_filter")
4✔
946
        partition = self.metadata.partitions[partition_index]
4✔
947

948
        for filter_values in self.source.iter_filters(partition.start, partition.stop):
4✔
949
            j = var_filter.next_buffer_row()
4✔
950
            var_filter.buff[j] = filter_values
4✔
951

952
        self.finalise_partition_array(partition_index, var_filter)
4✔
953

954
    def encode_contig_partition(self, partition_index):
4✔
955
        contig = self.init_partition_array(partition_index, "variant_contig")
4✔
956
        partition = self.metadata.partitions[partition_index]
4✔
957

958
        for contig_index in self.source.iter_contig(partition.start, partition.stop):
4✔
959
            j = contig.next_buffer_row()
4✔
960
            contig.buff[j] = contig_index
4✔
961

962
        self.finalise_partition_array(partition_index, contig)
4✔
963

964
    #######################
965
    # finalise
966
    #######################
967

968
    def finalise_array(self, name):
4✔
969
        logger.info(f"Finalising {name}")
4✔
970
        final_path = self.path / name
4✔
971
        if final_path.exists():
4✔
972
            # NEEDS TEST
973
            raise ValueError(f"Array {name} already exists")
×
974
        for partition in range(self.num_partitions):
4✔
975
            # Move all the files in partition dir to dest dir
976
            src = self.partition_array_path(partition, name)
4✔
977
            if not src.exists():
4✔
978
                # Needs test
979
                raise ValueError(f"Partition {partition} of {name} does not exist")
×
980
            dest = self.arrays_path / name
4✔
981
            # This is Zarr v2 specific. Chunks in v3 with start with "c" prefix.
982
            chunk_files = [
4✔
983
                path for path in src.iterdir() if not path.name.startswith(".")
984
            ]
985
            # TODO check for a count of then number of files. If we require a
986
            # dimension_separator of "/" then we could make stronger assertions
987
            # here, as we'd always have num_variant_chunks
988
            logger.debug(
4✔
989
                f"Moving {len(chunk_files)} chunks for {name} partition {partition}"
990
            )
991
            for chunk_file in chunk_files:
4✔
992
                os.rename(chunk_file, dest / chunk_file.name)
4✔
993
        # Finally, once all the chunks have moved into the arrays dir,
994
        # we move it out of wip
995
        os.rename(self.arrays_path / name, self.path / name)
4✔
996
        core.update_progress(1)
4✔
997

998
    def finalise(self, show_progress=False):
4✔
999
        self.load_metadata()
4✔
1000

1001
        logger.info(f"Scanning {self.num_partitions} partitions")
4✔
1002
        missing = []
4✔
1003
        # TODO may need a progress bar here
1004
        for partition_id in range(self.num_partitions):
4✔
1005
            if not self.partition_path(partition_id).exists():
4✔
1006
                missing.append(partition_id)
4✔
1007
        if len(missing) > 0:
4✔
1008
            raise FileNotFoundError(f"Partitions not encoded: {missing}")
4✔
1009

1010
        progress_config = core.ProgressConfig(
4✔
1011
            total=len(self.schema.fields),
1012
            title="Finalise",
1013
            units="array",
1014
            show=show_progress,
1015
        )
1016
        # NOTE: it's not clear that adding more workers will make this quicker,
1017
        # as it's just going to be causing contention on the file system.
1018
        # Something to check empirically in some deployments.
1019
        # FIXME we're just using worker_processes=0 here to hook into the
1020
        # SynchronousExecutor which is intended for testing purposes so
1021
        # that we get test coverage. Should fix this either by allowing
1022
        # for multiple workers, or making a standard wrapper for tqdm
1023
        # that allows us to have a consistent look and feel.
1024
        with core.ParallelWorkManager(0, progress_config) as pwm:
4✔
1025
            for field in self.schema.fields:
4✔
1026
                pwm.submit(self.finalise_array, field.name)
4✔
1027
        logger.debug(f"Removing {self.wip_path}")
4✔
1028
        shutil.rmtree(self.wip_path)
4✔
1029
        logger.info("Consolidating Zarr metadata")
4✔
1030
        zarr.consolidate_metadata(self.path)
4✔
1031

1032
    #######################
1033
    # index
1034
    #######################
1035

1036
    def create_index(self):
4✔
1037
        """Create an index to support efficient region queries."""
1038

1039
        indexer = VcfZarrIndexer(self.path)
4✔
1040
        indexer.create_index()
4✔
1041

1042
    ######################
1043
    # encode_all_partitions
1044
    ######################
1045

1046
    def get_max_encoding_memory(self):
4✔
1047
        """
1048
        Return the approximate maximum memory used to encode a variant chunk.
1049
        """
1050
        max_encoding_mem = 0
4✔
1051
        for array_spec in self.schema.fields:
4✔
1052
            max_encoding_mem = max(
4✔
1053
                max_encoding_mem, array_spec.variant_chunk_nbytes(self.schema)
1054
            )
1055
        gt_mem = 0
4✔
1056
        if self.has_genotypes:
4✔
1057
            gt_mem = sum(
4✔
1058
                field.variant_chunk_nbytes(self.schema)
1059
                for field in self.schema.fields
1060
                if field.name.startswith("call_genotype")
1061
            )
1062
        return max(max_encoding_mem, gt_mem)
4✔
1063

1064
    def encode_all_partitions(
4✔
1065
        self, *, worker_processes=1, show_progress=False, max_memory=None
1066
    ):
1067
        max_memory = core.parse_max_memory(max_memory)
4✔
1068
        self.load_metadata()
4✔
1069
        num_partitions = self.num_partitions
4✔
1070
        per_worker_memory = self.get_max_encoding_memory()
4✔
1071
        logger.info(
4✔
1072
            f"Encoding Zarr over {num_partitions} partitions with "
1073
            f"{worker_processes} workers and {core.display_size(per_worker_memory)} "
1074
            "per worker"
1075
        )
1076
        # Each partition requires per_worker_memory bytes, so to prevent more that
1077
        # max_memory being used, we clamp the number of workers
1078
        max_num_workers = max_memory // per_worker_memory
4✔
1079
        if max_num_workers < worker_processes:
4✔
1080
            logger.warning(
4✔
1081
                f"Limiting number of workers to {max_num_workers} to "
1082
                "keep within specified memory budget of "
1083
                f"{core.display_size(max_memory)}"
1084
            )
1085
        if max_num_workers <= 0:
4✔
1086
            raise ValueError(
4✔
1087
                f"Insufficient memory to encode a partition:"
1088
                f"{core.display_size(per_worker_memory)} > "
1089
                f"{core.display_size(max_memory)}"
1090
            )
1091
        num_workers = min(max_num_workers, worker_processes)
4✔
1092

1093
        total_bytes = 0
4✔
1094
        for array_spec in self.schema.fields:
4✔
1095
            # Open the array definition to get the total size
1096
            total_bytes += zarr.open(self.arrays_path / array_spec.name).nbytes
4✔
1097

1098
        progress_config = core.ProgressConfig(
4✔
1099
            total=total_bytes,
1100
            title="Encode",
1101
            units="B",
1102
            show=show_progress,
1103
        )
1104
        with core.ParallelWorkManager(num_workers, progress_config) as pwm:
4✔
1105
            for partition_index in range(num_partitions):
4✔
1106
                pwm.submit(self.encode_partition, partition_index)
4✔
1107

1108

1109
class VcfZarr:
4✔
1110
    def __init__(self, path):
4✔
1111
        if not (path / ".zmetadata").exists():
4✔
1112
            raise ValueError("Not in VcfZarr format")  # NEEDS TEST
×
1113
        self.path = path
4✔
1114
        self.root = zarr.open(path, mode="r")
4✔
1115

1116
    def summary_table(self):
4✔
1117
        data = []
4✔
1118
        arrays = [(core.du(self.path / a.basename), a) for _, a in self.root.arrays()]
4✔
1119
        arrays.sort(key=lambda x: x[0])
4✔
1120
        for stored, array in reversed(arrays):
4✔
1121
            d = {
4✔
1122
                "name": array.name,
1123
                "dtype": str(array.dtype),
1124
                "stored": core.display_size(stored),
1125
                "size": core.display_size(array.nbytes),
1126
                "ratio": core.display_number(array.nbytes / stored),
1127
                "nchunks": str(array.nchunks),
1128
                "chunk_size": core.display_size(array.nbytes / array.nchunks),
1129
                "avg_chunk_stored": core.display_size(int(stored / array.nchunks)),
1130
                "shape": str(array.shape),
1131
                "chunk_shape": str(array.chunks),
1132
                "compressor": str(array.compressor),
1133
                "filters": str(array.filters),
1134
            }
1135
            data.append(d)
4✔
1136
        return data
4✔
1137

1138

1139
class VcfZarrIndexer:
4✔
1140
    """
1141
    Creates an index for efficient region queries in a VCF Zarr dataset.
1142
    """
1143

1144
    def __init__(self, path):
4✔
1145
        self.path = pathlib.Path(path)
4✔
1146

1147
    def create_index(self):
4✔
1148
        """Create an index to support efficient region queries."""
1149
        root = zarr.open_group(store=self.path, mode="r+")
4✔
1150
        if (
4✔
1151
            "variant_contig" not in root
1152
            or "variant_position" not in root
1153
            or "variant_length" not in root
1154
        ):
1155
            raise ValueError(
4✔
1156
                "Cannot create index: variant_contig, "
1157
                "variant_position and variant_length arrays are required"
1158
            )
1159

1160
        contig = root["variant_contig"]
4✔
1161
        pos = root["variant_position"]
4✔
1162
        length = root["variant_length"]
4✔
1163

1164
        assert contig.cdata_shape == pos.cdata_shape
4✔
1165

1166
        index = []
4✔
1167

1168
        logger.info("Creating region index")
4✔
1169
        for v_chunk in range(pos.cdata_shape[0]):
4✔
1170
            c = contig.blocks[v_chunk]
4✔
1171
            p = pos.blocks[v_chunk]
4✔
1172
            e = p + length.blocks[v_chunk] - 1
4✔
1173

1174
            # create a row for each contig in the chunk
1175
            d = np.diff(c, append=-1)
4✔
1176
            c_start_idx = 0
4✔
1177
            for c_end_idx in np.nonzero(d)[0]:
4✔
1178
                assert c[c_start_idx] == c[c_end_idx]
4✔
1179
                index.append(
4✔
1180
                    (
1181
                        v_chunk,  # chunk index
1182
                        c[c_start_idx],  # contig ID
1183
                        p[c_start_idx],  # start
1184
                        p[c_end_idx],  # end
1185
                        np.max(e[c_start_idx : c_end_idx + 1]),  # max end
1186
                        c_end_idx - c_start_idx + 1,  # num records
1187
                    )
1188
                )
1189
                c_start_idx = c_end_idx + 1
4✔
1190

1191
        index = np.array(index, dtype=pos.dtype)
4✔
1192
        kwargs = {}
4✔
1193
        if not zarr_utils.zarr_v3():
4✔
1194
            kwargs["dimension_separator"] = "/"
4✔
1195
        array = root.array(
4✔
1196
            "region_index",
1197
            data=index,
1198
            shape=index.shape,
1199
            chunks=index.shape,
1200
            dtype=index.dtype,
1201
            compressor=numcodecs.Blosc("zstd", clevel=9, shuffle=0),
1202
            fill_value=None,
1203
            **kwargs,
1204
        )
1205
        array.attrs["_ARRAY_DIMENSIONS"] = [
4✔
1206
            "region_index_values",
1207
            "region_index_fields",
1208
        ]
1209

1210
        logger.info("Consolidating Zarr metadata")
4✔
1211
        zarr.consolidate_metadata(self.path)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc