• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sgkit-dev / bio2zarr / 18940786519

30 Oct 2025 12:34PM UTC coverage: 98.361% (+0.07%) from 98.292%
18940786519

Pull #422

github

web-flow
Merge b33cd4646 into 3f955cef0
Pull Request #422: Update intro.md with VCF Zarr conversion info

2880 of 2928 relevant lines covered (98.36%)

3.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.4
/bio2zarr/vcz_verification.py
1
import numpy as np
4✔
2
import numpy.testing as nt
4✔
3
import tqdm
4✔
4
import zarr
4✔
5

6
from bio2zarr import core
4✔
7
from bio2zarr.zarr_utils import first_dim_iter
4✔
8

9
from . import constants
4✔
10

11

12
def assert_all_missing_float(a):
4✔
13
    v = np.array(a, dtype=np.float32).view(np.int32)
4✔
14
    nt.assert_equal(v, constants.FLOAT32_MISSING_AS_INT32)
4✔
15

16

17
def assert_all_fill_float(a):
4✔
18
    v = np.array(a, dtype=np.float32).view(np.int32)
4✔
19
    nt.assert_equal(v, constants.FLOAT32_FILL_AS_INT32)
4✔
20

21

22
def assert_all_missing_int(a):
4✔
23
    v = np.array(a, dtype=int)
4✔
24
    nt.assert_equal(v, constants.INT_MISSING)
4✔
25

26

27
def assert_all_fill_int(a):
4✔
28
    v = np.array(a, dtype=int)
4✔
29
    nt.assert_equal(v, constants.INT_FILL)
4✔
30

31

32
def assert_all_missing_string(a):
4✔
33
    nt.assert_equal(a, constants.STR_MISSING)
4✔
34

35

36
def assert_all_fill_string(a):
4✔
37
    nt.assert_equal(a, constants.STR_FILL)
4✔
38

39

40
def assert_all_fill(zarr_val, vcf_type):
4✔
41
    if vcf_type == "Integer":
4✔
42
        assert_all_fill_int(zarr_val)
4✔
43
    elif vcf_type in ("String", "Character"):
4✔
44
        assert_all_fill_string(zarr_val)
4✔
45
    elif vcf_type == "Float":
4✔
46
        assert_all_fill_float(zarr_val)
4✔
47
    else:  # pragma: no cover
48
        assert False  # noqa PT015
49

50

51
def assert_all_missing(zarr_val, vcf_type):
4✔
52
    if vcf_type == "Integer":
4✔
53
        assert_all_missing_int(zarr_val)
4✔
54
    elif vcf_type in ("String", "Character"):
4✔
55
        assert_all_missing_string(zarr_val)
4✔
56
    elif vcf_type == "Flag":
4✔
57
        assert zarr_val == False  # noqa 712
4✔
58
    elif vcf_type == "Float":
4✔
59
        assert_all_missing_float(zarr_val)
4✔
60
    else:  # pragma: no cover
61
        assert False  # noqa PT015
62

63

64
def assert_info_val_missing(zarr_val, vcf_type):
4✔
65
    assert_all_missing(zarr_val, vcf_type)
4✔
66

67

68
def assert_format_val_missing(zarr_val, vcf_type):
4✔
69
    assert_info_val_missing(zarr_val, vcf_type)
4✔
70

71

72
# Note: checking exact equality may prove problematic here
73
# but we should be deterministically storing what cyvcf2
74
# provides, which should compare equal.
75

76

77
def assert_info_val_equal(vcf_val, zarr_val, vcf_type):
4✔
78
    assert vcf_val is not None
4✔
79
    if vcf_type in ("String", "Character"):
4✔
80
        split = list(vcf_val.split(","))
4✔
81
        k = len(split)
4✔
82
        if isinstance(zarr_val, str) or zarr_val.ndim == 0:
4✔
83
            assert k == 1
4✔
84
            # Scalar
85
            assert vcf_val == zarr_val
4✔
86
        else:
87
            nt.assert_equal(split, zarr_val[:k])
4✔
88
            assert_all_fill(zarr_val[k:], vcf_type)
4✔
89

90
    elif isinstance(vcf_val, tuple):
4✔
91
        vcf_missing_value_map = {
4✔
92
            "Integer": constants.INT_MISSING,
93
            "Float": constants.FLOAT32_MISSING,
94
        }
95
        v = [vcf_missing_value_map[vcf_type] if x is None else x for x in vcf_val]
4✔
96
        missing = np.array([j for j, x in enumerate(vcf_val) if x is None], dtype=int)
4✔
97
        a = np.array(v)
4✔
98
        k = len(a)
4✔
99
        # We are checking for int missing twice here, but it's necessary to have
100
        # a separate check for floats because different NaNs compare equal
101
        nt.assert_equal(a, zarr_val[:k])
4✔
102
        assert_all_missing(zarr_val[missing], vcf_type)
4✔
103
        if k < len(zarr_val):
4✔
104
            assert_all_fill(zarr_val[k:], vcf_type)
4✔
105
    else:
106
        # Scalar
107
        zarr_val = np.array(zarr_val, ndmin=1)
4✔
108
        assert len(zarr_val.shape) == 1
4✔
109
        assert vcf_val == zarr_val[0]
4✔
110
        if len(zarr_val) > 1:
4✔
111
            assert_all_fill(zarr_val[1:], vcf_type)
4✔
112

113

114
def assert_format_val_equal(vcf_val, zarr_val, vcf_type, vcf_number):
4✔
115
    assert vcf_val is not None
4✔
116
    assert isinstance(vcf_val, np.ndarray)
4✔
117
    if vcf_type in ("String", "Character"):
4✔
118
        assert len(vcf_val) == len(zarr_val)
4✔
119
        for v, z in zip(vcf_val, zarr_val):
4✔
120
            if vcf_number == "1":
4✔
121
                assert v == z
4✔
122
            else:
123
                split = list(v.split(","))
4✔
124
                k = len(split)
4✔
125
                nt.assert_equal(split, z[:k])
4✔
126
                assert_all_fill(z[k:], vcf_type)
4✔
127
    else:
128
        assert vcf_val.shape[0] == zarr_val.shape[0]
4✔
129
        if len(vcf_val.shape) == len(zarr_val.shape) + 1:
4✔
130
            assert vcf_val.shape[-1] == 1
4✔
131
            vcf_val = vcf_val[..., 0]
4✔
132
        assert len(vcf_val.shape) <= 2
4✔
133
        assert len(vcf_val.shape) == len(zarr_val.shape)
4✔
134
        if len(vcf_val.shape) == 2:
4✔
135
            k = vcf_val.shape[1]
4✔
136
            if zarr_val.shape[1] != k:
4✔
137
                assert_all_fill(zarr_val[:, k:], vcf_type)
4✔
138
                zarr_val = zarr_val[:, :k]
4✔
139
        assert vcf_val.shape == zarr_val.shape
4✔
140
        if vcf_type == "Integer":
4✔
141
            vcf_val[vcf_val == constants.VCF_INT_MISSING] = constants.INT_MISSING
4✔
142
            vcf_val[vcf_val == constants.VCF_INT_FILL] = constants.INT_FILL
4✔
143
        elif vcf_type == "Float":
4✔
144
            nt.assert_equal(vcf_val.view(np.int32), zarr_val.view(np.int32))
4✔
145

146
        nt.assert_equal(vcf_val, zarr_val)
4✔
147

148

149
@core.requires_optional_dependency("cyvcf2", "vcf")
4✔
150
def verify(vcf_path, zarr_path, show_progress=False):
4✔
151
    import cyvcf2
4✔
152

153
    root = zarr.open(store=zarr_path, mode="r")
4✔
154
    pos = root["variant_position"][:]
4✔
155
    allele = root["variant_allele"][:]
4✔
156
    chrom = root["contig_id"][:][root["variant_contig"][:]]
4✔
157
    vid = root["variant_id"][:]
4✔
158
    call_genotype = None
4✔
159
    if "call_genotype" in root and root["call_genotype"].size > 0:
4✔
160
        call_genotype = first_dim_iter(root["call_genotype"])
4✔
161

162
    vcf = cyvcf2.VCF(vcf_path)
4✔
163
    format_headers = {}
4✔
164
    info_headers = {}
4✔
165
    for h in vcf.header_iter():
4✔
166
        if h["HeaderType"] == "FORMAT":
4✔
167
            format_headers[h["ID"]] = h
4✔
168
        if h["HeaderType"] == "INFO":
4✔
169
            info_headers[h["ID"]] = h
4✔
170

171
    format_fields = {}
4✔
172
    info_fields = {}
4✔
173
    for colname in root.keys():
4✔
174
        if colname.startswith("call") and not colname.startswith("call_genotype"):
4✔
175
            vcf_name = colname.split("_", 1)[1]
4✔
176
            vcf_type = format_headers[vcf_name]["Type"]
4✔
177
            vcf_number = format_headers[vcf_name]["Number"]
4✔
178
            format_fields[vcf_name] = (
4✔
179
                vcf_type,
180
                vcf_number,
181
                first_dim_iter(root[colname]),
182
            )
183
        if colname.startswith("variant"):
4✔
184
            name = colname.split("_", 1)[1]
4✔
185
            if name.isupper():
4✔
186
                vcf_type = info_headers[name]["Type"]
4✔
187
                info_fields[name] = vcf_type, first_dim_iter(root[colname])
4✔
188

189
    first_pos = next(vcf).POS
4✔
190
    start_index = np.searchsorted(pos, first_pos)
4✔
191
    assert pos[start_index] == first_pos
4✔
192
    vcf = cyvcf2.VCF(vcf_path)
4✔
193
    if show_progress:
4✔
194
        iterator = tqdm.tqdm(vcf, desc="  Verify", total=vcf.num_records)  # NEEDS TEST
×
195
    else:
196
        iterator = vcf
4✔
197
    for j, row in enumerate(iterator, start_index):
4✔
198
        assert chrom[j] == row.CHROM
4✔
199
        assert pos[j] == row.POS
4✔
200
        assert vid[j] == ("." if row.ID is None else row.ID)
4✔
201
        assert allele[j, 0] == row.REF
4✔
202
        k = len(row.ALT)
4✔
203
        nt.assert_array_equal(allele[j, 1 : k + 1], row.ALT)
4✔
204
        assert np.all(allele[j, k + 1 :] == "")
4✔
205
        # TODO FILTERS
206

207
        if call_genotype is None:
4✔
208
            val = None
4✔
209
            try:
4✔
210
                val = row.format("GT")
4✔
211
            except KeyError:
4✔
212
                pass
4✔
213
            assert val is None
4✔
214
        else:
215
            gt = row.genotype.array()
4✔
216
            gt_zarr = next(call_genotype)
4✔
217
            gt_vcf = gt[:, :-1]
4✔
218
            # NOTE cyvcf2 remaps genotypes automatically
219
            # into the same missing/pad encoding that sgkit uses.
220
            nt.assert_array_equal(gt_zarr, gt_vcf)
4✔
221

222
        for name, (vcf_type, zarr_iter) in info_fields.items():
4✔
223
            vcf_val = row.INFO.get(name, None)
4✔
224
            zarr_val = next(zarr_iter)
4✔
225
            if vcf_val is None:
4✔
226
                assert_info_val_missing(zarr_val, vcf_type)
4✔
227
            else:
228
                assert_info_val_equal(vcf_val, zarr_val, vcf_type)
4✔
229

230
        for name, (vcf_type, vcf_number, zarr_iter) in format_fields.items():
4✔
231
            vcf_val = row.format(name)
4✔
232
            zarr_val = next(zarr_iter)
4✔
233
            if vcf_val is None:
4✔
234
                assert_format_val_missing(zarr_val, vcf_type)
4✔
235
            else:
236
                assert_format_val_equal(vcf_val, zarr_val, vcf_type, vcf_number)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc