• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cnr-ibba / SMARTER-database / 3757087739

pending completion
3757087739

Pull #77

github

GitHub
Merge 5b61fbfef into c1f1b3641
Pull Request #77: :bookmark: release v0.4.7

272 of 272 new or added lines in 11 files covered. (100.0%)

2697 of 2899 relevant lines covered (93.03%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.33
/src/data/SNPconvert.py
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
1✔
4
Created on Mon Oct 18 13:55:46 2021
5

6
@author: Paolo Cozzi <paolo.cozzi@ibba.cnr.it>
7
"""
8

9
import click
1✔
10
import logging
1✔
11
import tempfile
1✔
12
import subprocess
1✔
13

14
from pathlib import Path
1✔
15
from click_option_group import (
1✔
16
    optgroup, RequiredMutuallyExclusiveOptionGroup,
17
    MutuallyExclusiveOptionGroup)
18

19
from src.features.smarterdb import (
1✔
20
    Dataset, global_connection, SmarterDBException)
21
from src.features.plinkio import TextPlinkIO, IlluminaReportIO, BinaryPlinkIO
1✔
22
from src.data.common import WORKING_ASSEMBLIES, PLINK_SPECIES_OPT, AssemblyConf
1✔
23

24
# Get an instance of a logger
25
logger = logging.getLogger(__name__)
1✔
26

27

28
class CustomMixin():
1✔
29
    def _process_pedline(
1✔
30
            self,
31
            line: list,
32
            dataset: Dataset,
33
            coding: str,
34
            create_sample: bool = False,
35
            sample_field: str = "original_id",
36
            ignore_coding_errors: bool = False):
37

38
        self._check_file_sizes(line)
1✔
39

40
        logger.debug(f"Processing {line[:10]+ ['...']}")
1✔
41

42
        # a new line obj
43
        new_line = line.copy()
1✔
44

45
        # check and fix genotypes if necessary
46
        new_line = self._process_genotypes(
1✔
47
            new_line,
48
            coding,
49
            ignore_coding_errors)
50

51
        # need to remove filtered snps from ped line
52
        for index in sorted(self.filtered, reverse=True):
1✔
53
            # index is snp position. Need to delete two fields
54
            del new_line[6+index*2+1]
1✔
55
            del new_line[6+index*2]
1✔
56

57
        return new_line
1✔
58

59

60
class CustomTextPlinkIO(CustomMixin, TextPlinkIO):
1✔
61
    pass
1✔
62

63

64
class CustomBinaryPlinkIO(CustomMixin, BinaryPlinkIO):
1✔
65
    pass
1✔
66

67

68
class CustomIlluminaReportIO(CustomMixin, IlluminaReportIO):
1✔
69
    def read_reportfile(
1✔
70
            self, breed="0", dataset: Dataset = None, *args, **kwargs):
71
        """Custom Open illumina report returns iterator"""
72

73
        logger.debug("Custom 'read_reportfile' called")
1✔
74

75
        return super().read_reportfile(breed, dataset, *args, **kwargs)
1✔
76

77

78
def get_output_files(prefix: str, assembly: str):
1✔
79
    # create output directory
80
    working_dir = tempfile.mkdtemp()
1✔
81
    output_dir = Path(working_dir)
1✔
82

83
    # determine map outputfile. get the basename of the prefix
84
    prefix = Path(prefix)
1✔
85

86
    output_map = f"{prefix.name}_updated.map"
1✔
87
    output_map = output_dir / output_map
1✔
88

89
    # determine ped outputfile
90
    output_ped = f"{prefix.name}_updated.ped"
1✔
91
    output_ped = output_dir / output_ped
1✔
92

93
    return output_dir, output_map, output_ped
1✔
94

95

96
def deal_with_text_plink(file_: str, assembly: str, species: str):
1✔
97
    mapfile = file_ + ".map"
1✔
98
    pedfile = file_ + ".ped"
1✔
99

100
    # instantiating a TextPlinkIO object
101
    plinkio = CustomTextPlinkIO(
1✔
102
        mapfile=mapfile,
103
        pedfile=pedfile,
104
        species=species
105
    )
106

107
    plinkio.read_mapfile()
1✔
108

109
    # determine output files
110
    output_dir, output_map, output_ped = get_output_files(file_, assembly)
1✔
111

112
    return plinkio, output_dir, output_map, output_ped
1✔
113

114

115
def deal_with_binary_plink(bfile: str, assembly: str, species: str):
1✔
116
    # instantiating a BinaryPlinkIO object
117
    plinkio = CustomBinaryPlinkIO(
1✔
118
        prefix=bfile,
119
        species=species
120
    )
121

122
    plinkio.read_mapfile()
1✔
123

124
    # determine output files
125
    output_dir, output_map, output_ped = get_output_files(bfile, assembly)
1✔
126

127
    return plinkio, output_dir, output_map, output_ped
1✔
128

129

130
def deal_with_illumina(
1✔
131
        report: str, snpfile: str, assembly: str, species: str):
132
    plinkio = CustomIlluminaReportIO(
1✔
133
        snpfile=snpfile,
134
        report=report,
135
        species=species,
136
    )
137

138
    plinkio.read_snpfile()
1✔
139

140
    # determine output files
141
    output_dir, output_map, output_ped = get_output_files(
1✔
142
        Path(report).stem, assembly)
143

144
    return plinkio, output_dir, output_map, output_ped
1✔
145

146

147
@click.command()
1✔
148
@optgroup.group(
1✔
149
    'Plink input parameters',
150
    cls=RequiredMutuallyExclusiveOptionGroup
151
)
152
@optgroup.option(
1✔
153
    '--file',
154
    'file_',
155
    type=str,
156
    help="PLINK text file prefix")
157
@optgroup.option(
1✔
158
    '--bfile',
159
    type=str,
160
    help="PLINK binary file prefix")
161
@optgroup.option(
1✔
162
    '--report',
163
    type=str,
164
    help="The illumina report file")
165
@click.option(
1✔
166
    '--snpfile',
167
    type=str,
168
    help="The illumina SNPlist file")
169
@click.option(
1✔
170
    '--coding',
171
    type=click.Choice(
172
        ['top', 'forward', 'ab'],
173
        case_sensitive=False),
174
    default="top", show_default=True,
175
    help="Illumina coding format"
176
)
177
@click.option(
1✔
178
    '--assembly',
179
    type=str,
180
    required=True,
181
    help="Destination assembly of the converted genotypes")
182
@click.option(
1✔
183
    '--species',
184
    type=str,
185
    required=True,
186
    help="The SMARTER assembly species (Goat or Sheep)")
187
@click.option(
1✔
188
    '--results_dir',
189
    type=str,
190
    required=True,
191
    help="Where results will be saved")
192
@click.option(
1✔
193
    '--chip_name',
194
    type=str,
195
    help="The SMARTER SupportedChip name")
196
@optgroup.group(
1✔
197
    'Variant Search Type',
198
    cls=MutuallyExclusiveOptionGroup
199
)
200
@optgroup.option(
1✔
201
    '--search_field',
202
    type=str,
203
    default="name",
204
    show_default=True,
205
    help='search variants using this field')
206
@optgroup.option(
1✔
207
    '--search_by_positions',
208
    is_flag=True,
209
    help='search variants using their positions')
210
@click.option(
1✔
211
    '--src_version',
212
    type=str,
213
    help="Source assembly version")
214
@click.option(
1✔
215
    '--src_imported_from',
216
    type=str,
217
    help="Source assembly imported_from")
218
@click.option(
1✔
219
    '--ignore_coding_errors',
220
    is_flag=True,
221
    help=(
222
        'set SNP as missing when there are coding errors '
223
        '(no more CodingException)'))
224
def main(file_, bfile, report, snpfile, coding, assembly, species, chip_name,
1✔
225
         results_dir, search_field, search_by_positions, src_version,
226
         src_imported_from, ignore_coding_errors):
227
    """
228
    Convert a PLINK/Illumina report file in a SMARTER-like ouput file, without
229
    inserting data in SMARTER-database. Useful to convert data relying on
230
    SMARTER-database for private datasets (data which cannot be included in
231
    SMARTER-database)
232
    """
233

234
    logger.info(f"{Path(__file__).name} started")
1✔
235

236
    # find assembly configuration
237
    if assembly not in WORKING_ASSEMBLIES:
1✔
238
        raise SmarterDBException(
1✔
239
            f"assembly {assembly} not managed by smarter")
240

241
    src_assembly, dst_assembly = None, None
1✔
242

243
    if src_version and src_imported_from:
1✔
244
        src_assembly = AssemblyConf(src_version, src_imported_from)
1✔
245
        logger.info(f"Got '{src_assembly} as source assembly'")
1✔
246
        dst_assembly = WORKING_ASSEMBLIES[assembly]
1✔
247

248
    else:
249
        src_assembly = WORKING_ASSEMBLIES[assembly]
1✔
250

251
    if file_:
1✔
252
        plinkio, output_dir, output_map, output_ped = deal_with_text_plink(
1✔
253
            file_, assembly, species)
254

255
    elif bfile:
1✔
256
        plinkio, output_dir, output_map, output_ped = deal_with_binary_plink(
1✔
257
            bfile, assembly, species)
258

259
    elif report:
1✔
260
        if not snpfile:
1✔
261
            raise RuntimeError(f"Missing snpfile for report {report}")
1✔
262

263
        plinkio, output_dir, output_map, output_ped = deal_with_illumina(
1✔
264
            report, snpfile, assembly, species)
265

266
    # ok check for results dir
267
    results_dir = Path(results_dir)
1✔
268
    results_dir.mkdir(parents=True, exist_ok=True)
1✔
269

270
    # define final filename
271
    final_prefix = results_dir / output_ped.stem
1✔
272

273
    # if I arrive here, I can create output files
274

275
    # fetch coordinates relying assembly configuration
276
    if search_by_positions:
1✔
277
        # fetch variants relying positions
278
        plinkio.fetch_coordinates_by_positions(
1✔
279
            src_assembly=src_assembly,
280
            dst_assembly=dst_assembly
281
        )
282

283
    else:
284
        # fetch coordinates relying assembly configuration
285
        plinkio.fetch_coordinates(
1✔
286
            src_assembly=src_assembly,
287
            dst_assembly=dst_assembly,
288
            search_field=search_field,
289
            chip_name=chip_name
290
        )
291

292
    logger.info("Writing a new map file with updated coordinates")
1✔
293
    plinkio.update_mapfile(str(output_map))
1✔
294

295
    logger.info("Writing a new ped file with fixed genotype")
1✔
296
    plinkio.update_pedfile(
1✔
297
        outputfile=output_ped,
298
        dataset=None,
299
        coding=coding,
300
        create_samples=False,
301
        ignore_coding_errors=ignore_coding_errors
302
    )
303

304
    # ok time to convert data in plink binary format
305
    cmd = ["plink"] + PLINK_SPECIES_OPT[species] + [
1✔
306
        "--file",
307
        f"{output_dir / output_ped.stem}",
308
        "--make-bed",
309
        "--out",
310
        f"{final_prefix}"
311
    ]
312

313
    # debug
314
    logger.info("Executing: " + " ".join(cmd))
1✔
315

316
    subprocess.run(cmd, check=True)
1✔
317

318
    logger.info(f"{Path(__file__).name} ended")
1✔
319

320

321
if __name__ == '__main__':
1✔
322
    log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
×
323
    logging.basicConfig(level=logging.INFO, format=log_fmt)
×
324

325
    # connect to database
326
    global_connection()
×
327

328
    # call main function
329
    main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc