• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sgkit-dev / bio2zarr / 14242614654

03 Apr 2025 12:10PM UTC coverage: 98.771% (-0.1%) from 98.867%
14242614654

Pull #339

github

web-flow
Merge 965501277 into 5439661d3
Pull Request #339: Common writer for plink and ICF

771 of 782 new or added lines in 6 files covered. (98.59%)

12 existing lines in 2 files now uncovered.

2571 of 2603 relevant lines covered (98.77%)

5.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.19
/bio2zarr/schema.py
1
import dataclasses
6✔
2
import json
6✔
3
import logging
6✔
4

5
import numcodecs
6✔
6
import numpy as np
6✔
7

8
from bio2zarr import core
6✔
9

10
logger = logging.getLogger(__name__)
6✔
11

12
ZARR_SCHEMA_FORMAT_VERSION = "0.4"
6✔
13

14
DEFAULT_ZARR_COMPRESSOR = numcodecs.Blosc(cname="zstd", clevel=7)
6✔
15

16
_fixed_field_descriptions = {
6✔
17
    "variant_contig": "An identifier from the reference genome or an angle-bracketed ID"
18
    " string pointing to a contig in the assembly file",
19
    "variant_position": "The reference position",
20
    "variant_length": "The length of the variant measured in bases",
21
    "variant_id": "List of unique identifiers where applicable",
22
    "variant_allele": "List of the reference and alternate alleles",
23
    "variant_quality": "Phred-scaled quality score",
24
    "variant_filter": "Filter status of the variant",
25
}
26

27

28
@dataclasses.dataclass
6✔
29
class ZarrArraySpec:
6✔
30
    name: str
6✔
31
    dtype: str
6✔
32
    shape: tuple
6✔
33
    chunks: tuple
6✔
34
    dimensions: tuple
6✔
35
    description: str
6✔
36
    vcf_field: str
6✔
37
    compressor: dict
6✔
38
    filters: list
6✔
39

40
    def __post_init__(self):
6✔
41
        if self.name in _fixed_field_descriptions:
6✔
42
            self.description = self.description or _fixed_field_descriptions[self.name]
6✔
43

44
        # Ensure these are tuples for ease of comparison and consistency
45
        self.shape = tuple(self.shape)
6✔
46
        self.chunks = tuple(self.chunks)
6✔
47
        self.dimensions = tuple(self.dimensions)
6✔
48
        self.filters = tuple(self.filters)
6✔
49

50
    @staticmethod
6✔
51
    def new(**kwargs):
6✔
52
        spec = ZarrArraySpec(
6✔
53
            **kwargs, compressor=DEFAULT_ZARR_COMPRESSOR.get_config(), filters=[]
54
        )
55
        spec._choose_compressor_settings()
6✔
56
        return spec
6✔
57

58
    @staticmethod
6✔
59
    def from_field(
6✔
60
        vcf_field,
61
        *,
62
        num_variants,
63
        num_samples,
64
        variants_chunk_size,
65
        samples_chunk_size,
66
        array_name=None,
67
    ):
68
        shape = [num_variants]
6✔
69
        prefix = "variant_"
6✔
70
        dimensions = ["variants"]
6✔
71
        chunks = [variants_chunk_size]
6✔
72
        if vcf_field.category == "FORMAT":
6✔
73
            prefix = "call_"
6✔
74
            shape.append(num_samples)
6✔
75
            chunks.append(samples_chunk_size)
6✔
76
            dimensions.append("samples")
6✔
77
        if array_name is None:
6✔
78
            array_name = prefix + vcf_field.name
6✔
79
        # TODO make an option to add in the empty extra dimension
80
        if vcf_field.summary.max_number > 1 or vcf_field.full_name == "FORMAT/LAA":
6✔
81
            shape.append(vcf_field.summary.max_number)
6✔
82
            chunks.append(vcf_field.summary.max_number)
6✔
83
            # TODO we should really be checking this to see if the named dimensions
84
            # are actually correct.
85
            if vcf_field.vcf_number == "R":
6✔
86
                dimensions.append("alleles")
6✔
87
            elif vcf_field.vcf_number == "A":
6✔
88
                dimensions.append("alt_alleles")
6✔
89
            elif vcf_field.vcf_number == "G":
6✔
90
                dimensions.append("genotypes")
6✔
91
            else:
92
                dimensions.append(f"{vcf_field.category}_{vcf_field.name}_dim")
6✔
93
        return ZarrArraySpec.new(
6✔
94
            vcf_field=vcf_field.full_name,
95
            name=array_name,
96
            dtype=vcf_field.smallest_dtype(),
97
            shape=shape,
98
            chunks=chunks,
99
            dimensions=dimensions,
100
            description=vcf_field.description,
101
        )
102

103
    def _choose_compressor_settings(self):
6✔
104
        """
105
        Choose compressor and filter settings based on the size and
106
        type of the array, plus some hueristics from observed properties
107
        of VCFs.
108

109
        See https://github.com/pystatgen/bio2zarr/discussions/74
110
        """
111
        # Default is to not shuffle, because autoshuffle isn't recognised
112
        # by many Zarr implementations, and shuffling can lead to worse
113
        # performance in some cases anyway. Turning on shuffle should be a
114
        # deliberate choice.
115
        shuffle = numcodecs.Blosc.NOSHUFFLE
6✔
116
        if self.name == "call_genotype" and self.dtype == "i1":
6✔
117
            # call_genotype gets BITSHUFFLE by default as it gets
118
            # significantly better compression (at a cost of slower
119
            # decoding)
120
            shuffle = numcodecs.Blosc.BITSHUFFLE
6✔
121
        elif self.dtype == "bool":
6✔
122
            shuffle = numcodecs.Blosc.BITSHUFFLE
6✔
123

124
        self.compressor["shuffle"] = shuffle
6✔
125

126
    @property
6✔
127
    def chunk_nbytes(self):
6✔
128
        """
129
        Returns the nbytes for a single chunk in this array.
130
        """
131
        items = 1
6✔
132
        dim = 0
6✔
133
        for chunk_size in self.chunks:
6✔
134
            size = min(chunk_size, self.shape[dim])
6✔
135
            items *= size
6✔
136
            dim += 1
6✔
137
        # Include sizes for extra dimensions.
138
        for size in self.shape[dim:]:
6✔
NEW
139
            items *= size
×
140
        dt = np.dtype(self.dtype)
6✔
141
        return items * dt.itemsize
6✔
142

143
    @property
6✔
144
    def variant_chunk_nbytes(self):
6✔
145
        """
146
        Returns the nbytes for a single variant chunk of this array.
147
        """
148
        chunk_items = self.chunks[0]
6✔
149
        for size in self.shape[1:]:
6✔
150
            chunk_items *= size
6✔
151
        dt = np.dtype(self.dtype)
6✔
152
        if dt.kind == "O" and "samples" in self.dimensions:
6✔
153
            logger.warning(
6✔
154
                f"Field {self.name} is a string; max memory usage may "
155
                "be a significant underestimate"
156
            )
157
        return chunk_items * dt.itemsize
6✔
158

159

160
@dataclasses.dataclass
6✔
161
class Contig:
6✔
162
    id: str
6✔
163
    length: int = None
6✔
164

165

166
@dataclasses.dataclass
6✔
167
class Sample:
6✔
168
    id: str
6✔
169

170

171
@dataclasses.dataclass
6✔
172
class Filter:
6✔
173
    id: str
6✔
174
    description: str = ""
6✔
175

176

177
@dataclasses.dataclass
6✔
178
class VcfZarrSchema(core.JsonDataclass):
6✔
179
    format_version: str
6✔
180
    samples_chunk_size: int
6✔
181
    variants_chunk_size: int
6✔
182
    samples: list
6✔
183
    contigs: list
6✔
184
    filters: list
6✔
185
    fields: list
6✔
186

187
    def validate(self):
6✔
188
        """
189
        Checks that the schema is well-formed and within required limits.
190
        """
191
        for field in self.fields:
6✔
192
            # This is the Blosc max buffer size
193
            if field.chunk_nbytes > 2147483647:
6✔
194
                # TODO add some links to documentation here advising how to
195
                # deal with PL values.
196
                raise ValueError(
6✔
197
                    f"Field {field.name} chunks are too large "
198
                    f"({field.chunk_nbytes} > 2**31 - 1 bytes). "
199
                    "Either generate a schema and drop this field (if you don't "
200
                    "need it) or reduce the variant or sample chunk sizes."
201
                )
202
            # TODO other checks? There must be lots of ways people could mess
203
            # up the schema leading to cryptic errors.
204

205
    def field_map(self):
6✔
206
        return {field.name: field for field in self.fields}
6✔
207

208
    @staticmethod
6✔
209
    def fromdict(d):
6✔
210
        if d["format_version"] != ZARR_SCHEMA_FORMAT_VERSION:
6✔
211
            raise ValueError(
6✔
212
                "Zarr schema format version mismatch: "
213
                f"{d['format_version']} != {ZARR_SCHEMA_FORMAT_VERSION}"
214
            )
215
        ret = VcfZarrSchema(**d)
6✔
216
        ret.samples = [Sample(**sd) for sd in d["samples"]]
6✔
217
        ret.contigs = [Contig(**sd) for sd in d["contigs"]]
6✔
218
        ret.filters = [Filter(**sd) for sd in d["filters"]]
6✔
219
        ret.fields = [ZarrArraySpec(**sd) for sd in d["fields"]]
6✔
220
        return ret
6✔
221

222
    @staticmethod
6✔
223
    def fromjson(s):
6✔
224
        return VcfZarrSchema.fromdict(json.loads(s))
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc