• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cnr-ibba / SMARTER-database / 9189630163

22 May 2024 10:26AM UTC coverage: 94.311%. Remained the same
9189630163

Pull #121

github

web-flow
Merge 389cf0ae7 into fcdb3ce6a
Pull Request #121: ⬆️ Bump jinja2 from 3.1.2 to 3.1.4

2984 of 3164 relevant lines covered (94.31%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.81
/src/data/import_dbsnp.py
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
1✔
4
Created on Wed Mar  3 18:07:13 2021
5

6
@author: Paolo Cozzi <paolo.cozzi@ibba.cnr.it>
7

8
Load data from dbSNP dump files and update illumina SNPs
9
"""
10

11
import click
1✔
12
import logging
1✔
13
import pathlib
1✔
14

15
from typing import Union
1✔
16
from functools import partial
1✔
17

18
from src.features.smarterdb import (
1✔
19
    global_connection, SupportedChip, Location, VariantSheep, VariantGoat)
20
from src.features.dbsnp import read_dbSNP, search_chip_snps
1✔
21
from src.features.illumina import IlluSNP
1✔
22
from src.data.common import (
1✔
23
    get_variant_species, update_location, update_rs_id, AssemblyConf)
24

25
logger = logging.getLogger(__name__)
1✔
26

27
# global variables (defined in main)
28
VariantSpecie = None
1✔
29
assembly_conf = None
1✔
30

31

32
def search_variant(
1✔
33
        sss: dict,
34
        rs_id: str,
35
        locSnpIds: set[str],
36
        VariantSpecie: Union[VariantSheep, VariantGoat]) -> list[
37
            Union[VariantSheep, VariantGoat]]:
38
    """
39
    Return VariantSheep or VariantGoat instance for the provided SS
40
    evidences
41

42
    Parameters
43
    ----------
44
    sss : dict
45
        A dictionary with all the dbSNP SS hits for illuminas.
46
    rs_id : str
47
        The rsId identifier of such SNP.
48
    locSnpIds : set[str]
49
        A list of Illumina probe names.
50
    VariantSpecie : Union[VariantSheep, VariantGoat]
51
        The specie class (VariantSheep or VariantGoat) to be used when
52
        searching in SMARTER database
53

54
    Returns
55
    -------
56
    list[Union[VariantSheep, VariantGoat]]
57
        A list of variants (Sheep Or Goat).
58
    """
59

60
    if len(sss) > 1:
1✔
61
        logger.debug(f"Got {len(sss)} ss for '{rs_id}'")
1✔
62
        variants = VariantSpecie.objects.filter(name__in=list(locSnpIds))
1✔
63

64
    elif len(sss) == 1:
×
65
        ss = sss[0]
×
66

67
        # ok get a variant from database and return it
68
        variants = VariantSpecie.objects.filter(name=ss['locSnpId'])
×
69

70
    if len(variants) > 1:
1✔
71
        logger.warning(
×
72
            f"Got {len(variants)} Variants for '{rs_id}'")
73

74
    return variants
1✔
75

76

77
def process_variant(
1✔
78
        snp: dict,
79
        variant: Union[VariantSheep, VariantGoat],
80
        supported_chips: list) -> Location:
81
    """
82
    Process a SNP read from dbSNP XML file and return a new Location object
83

84
    Parameters
85
    ----------
86
    snp : dict
87
        A dictionary with all data from a dbSNP rsId.
88
    variant : Union[VariantSheep, VariantGoat]
89
        A SMARTER variant object.
90
    supported_chips : list
91
        A list of supported chips as string.
92

93
    Returns
94
    -------
95
    Location
96
        A SMARTER Location object for the read SNP.
97

98
    """
99
    global assembly_conf
100

101
    logger.debug(f"Processing '{variant.name}'")
1✔
102

103
    # get the SS relying on ss[locSnpId']
104
    ss = next(filter(lambda ss: ss['locSnpId'] == variant.name, snp['ss']))
1✔
105

106
    logger.debug(f"Got {ss} as ss data")
1✔
107

108
    assembly = snp.get('assembly')
1✔
109

110
    logger.debug(f"Got {assembly} as assembly")
1✔
111

112
    # next: I need to determine the illumina top for this SNP
113
    for chip_name in supported_chips:
1✔
114
        if chip_name in variant.sequence:
1✔
115
            sequence = variant.sequence[chip_name]
1✔
116
            break
1✔
117

118
    illu_snp = IlluSNP(sequence=sequence, max_iter=25)
1✔
119

120
    chromosome = "0"
1✔
121
    position = 0
1✔
122

123
    if (assembly and
1✔
124
            'chromosome' in assembly['component'] and
125
            assembly['snpstat']['mapWeight'] == 'unique-in-contig'):
126

127
        # read chromosome and position
128
        chromosome = assembly['component']['chromosome']
1✔
129
        position = int(assembly['component']['maploc']['physMapInt'])+1
1✔
130

131
    # Using assembly_conf when creating a Location
132
    return Location(
1✔
133
        ss_id=f"ss{ss['ssId']}",
134
        version=assembly_conf.version,
135
        imported_from=assembly_conf.imported_from,
136
        chrom=chromosome,
137
        position=position,
138
        alleles=ss['observed'],
139
        illumina_strand=ss.get('strand', illu_snp.strand),
140
        strand=ss.get('orient'),
141
        illumina=illu_snp.illumina
142
    )
143

144

145
def process_dbsnp_file(
1✔
146
        input_file: pathlib.Path,
147
        sender: str,
148
        all_snp_names: set[str],
149
        supported_chips: list[str]):
150
    """
151
    Process a single dbSNP file and put data into database
152

153
    Parameters
154
    ----------
155
    input_file : pathlib.Path
156
        The dbSNP input file
157
    sender : str
158
        The SNP sender (ex. AGR_BS, IGGC).
159
    all_snp_names : set[str]
160
        A set containing all the SNP names than need to be updated.
161
    supported_chips : list[str]
162
        A list of supported chips (required to collect the original sequence).
163

164
    Returns
165
    -------
166
    None.
167

168
    """
169

170
    global VariantSpecie
171

172
    logger.info(f"Reading from '{input_file}'")
1✔
173

174
    handle_filter = partial(search_chip_snps, handle=sender)
1✔
175

176
    # cicle amoung dbsnp object
177
    for i, snp in enumerate(filter(handle_filter, read_dbSNP(input_file))):
1✔
178
        if (i+1) % 5000 == 0:
1✔
179
            logger.info(f"{i+1} variants processed for '{input_file}'")
×
180

181
        # determine rs_id once
182
        rs_id = f"rs{snp['rsId']}"
1✔
183

184
        # now get only the SS objects with the required handle
185
        sss = list(filter(lambda ss: ss['handle'] == sender, snp['ss']))
1✔
186

187
        # test for locSnpId in my database
188
        locSnpIds = set([ss['locSnpId'] for ss in sss])
1✔
189

190
        # Skip variants not in database
191
        if not locSnpIds.intersection(all_snp_names):
1✔
192
            logger.debug(f"Skipping '{locSnpIds}': not in database")
1✔
193
            continue
1✔
194

195
        variants = search_variant(sss, rs_id, locSnpIds, VariantSpecie)
1✔
196

197
        for variant in variants:
1✔
198
            location = process_variant(snp, variant, supported_chips)
1✔
199

200
            # Should I update a location or not?
201
            update_variant = False
1✔
202

203
            variant, updated = update_location(location, variant)
1✔
204

205
            if updated:
1✔
206
                update_variant = True
1✔
207

208
            variant, updated = update_rs_id(
1✔
209
                # create a fake variant with rs_id to use this method
210
                VariantSpecie(rs_id=[rs_id]),
211
                variant)
212

213
            if updated:
1✔
214
                update_variant = True
×
215

216
            if update_variant:
1✔
217
                # update variant with snpchimp data
218
                variant.save()
1✔
219

220
    logger.info(f"{i+1} variants processed for '{input_file}'")
1✔
221

222

223
@click.command()
1✔
224
@click.option(
1✔
225
    '--species_class',
226
    type=str,
227
    required=True,
228
    help="The generic species of dbSNP data (Sheep or Goat)"
229
)
230
@click.option(
1✔
231
    '--input_dir',
232
    'input_dir',
233
    type=click.Path(exists=True),
234
    required=True,
235
    help="The directory with dbSNP input (XML) files"
236
)
237
@click.option(
1✔
238
    '--pattern',
239
    type=str,
240
    default="*.gz",
241
    help="The directory with dbSNP input (XML) files",
242
    show_default=True
243
)
244
@click.option(
1✔
245
    '--sender',
246
    type=str,
247
    required=True,
248
    help="The SNP sender (ex. AGR_BS, IGGC)"
249
)
250
@click.option(
1✔
251
    '--version',
252
    type=str,
253
    required=True,
254
    help="The assembly version"
255
)
256
@click.option(
1✔
257
    '--imported_from',
258
    type=str,
259
    default="dbSNP152",
260
    help="The source of this data"
261
)
262
def main(species_class, input_dir, pattern, sender, version, imported_from):
1✔
263
    global VariantSpecie
264
    global assembly_conf
265

266
    # determine assembly configuration
267
    assembly_conf = AssemblyConf(version=version, imported_from=imported_from)
1✔
268

269
    # determining the proper VariantSpecies class
270
    VariantSpecie = get_variant_species(species_class)
1✔
271

272
    logger.info("Search all snp names in database")
1✔
273

274
    # determine the supported chips names
275
    supported_chips = SupportedChip.objects.filter(
1✔
276
        manufacturer="illumina",
277
        species=species_class.capitalize(),
278
    )
279
    supported_chips = [chip.name for chip in supported_chips]
1✔
280

281
    logger.debug(f"Considering '{supported_chips}' chips")
1✔
282

283
    all_snp_names = set([
1✔
284
        variant.name for variant in VariantSpecie.objects.filter(
285
            chip_name__in=supported_chips).fields(name=1)
286
    ])
287

288
    logger.info(f"Got {len(all_snp_names)} SNPs for 'illumina' manufacturer")
1✔
289

290
    for input_file in pathlib.Path(input_dir).glob(pattern):
1✔
291
        process_dbsnp_file(input_file, sender, all_snp_names, supported_chips)
1✔
292

293
    logger.info("Completed")
1✔
294

295

296
if __name__ == '__main__':
1✔
297
    log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
×
298
    logging.basicConfig(level=logging.INFO, format=log_fmt)
×
299
    logging.getLogger('src.features.dbsnp').setLevel(logging.INFO)
×
300
    logger = logging.getLogger(__name__)
×
301

302
    # connect to database
303
    global_connection()
×
304

305
    main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc