• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openvax / pyensembl / 25776010820

13 May 2026 03:14AM UTC coverage: 84.971% (+0.06%) from 84.907%
25776010820

push

github

web-flow
Fix #335 (part 2): tolerate versioned IDs in protein/transcript FASTA lookups (#350)

GENCODE GTFs embed the assembly version directly in the protein_id and
transcript_id attributes (e.g. protein_id "ENSP00000123456.3"), while
pyensembl's FASTA parser strips ENS.N suffixes from headers. The literal
lookup with the versioned ID therefore missed, and Transcript.protein_sequence
returned None for GENCODE FASTAs even when the sequence was present.

Adds sequence_data.sequence_lookup_with_ens_fallback() which tries the
literal id first (handles both unversioned Ensembl GTFs and any future
FASTA that preserves versions) and falls back to the version-stripped
form for ENS.N identifiers.

Wires the helper into:
- Transcript.sequence (previously stripped unconditionally; now tries
  versioned first then strips)
- Transcript.protein_sequence (previously missing for GENCODE GTFs)
- Genome.transcript_sequence(transcript_id)
- Genome.protein_sequence(protein_id)

Bump version to 2.9.6.

16 of 18 new or added lines in 4 files covered. (88.89%)

1 existing line in 1 file now uncovered.

1600 of 1883 relevant lines covered (84.97%)

3.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.9
/pyensembl/sequence_data.py
1
# Licensed under the Apache License, Version 2.0 (the "License");
2
# you may not use this file except in compliance with the License.
3
# You may obtain a copy of the License at
4
#
5
#     http://www.apache.org/licenses/LICENSE-2.0
6
#
7
# Unless required by applicable law or agreed to in writing, software
8
# distributed under the License is distributed on an "AS IS" BASIS,
9
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10
# See the License for the specific language governing permissions and
11
# limitations under the License.
12

13
from os import remove
4✔
14
from os.path import dirname, exists, abspath, split, join
4✔
15

16
import datacache
4✔
17
import logging
4✔
18
from collections import Counter
4✔
19
import pickle
4✔
20
from .common import load_pickle, dump_pickle
4✔
21
from .fasta import parse_fasta_dictionary
4✔
22

23

24
logger = logging.getLogger(__name__)
4✔
25

26

27
def sequence_lookup_with_ens_fallback(sequence_data, identifier):
4✔
28
    """
29
    Look up ``identifier`` in ``sequence_data``. If the lookup misses and the
30
    identifier looks like an Ensembl ID with a version suffix (e.g.
31
    ``"ENSP00000123456.3"``), strip the suffix and try again.
32

33
    Needed because pyensembl's FASTA parser strips ENS.N suffixes from
34
    headers (so the dictionary key is the unversioned ID) but GENCODE GTFs
35
    embed the version in ``protein_id`` / ``transcript_id`` attributes, so
36
    a literal lookup would miss.
37
    """
38
    if not identifier:
4✔
NEW
39
        return None
×
40
    sequence = sequence_data.get(identifier)
4✔
41
    if sequence is not None:
4✔
42
        return sequence
4✔
43
    if identifier.startswith("ENS") and "." in identifier:
4✔
44
        stripped = identifier.rsplit(".", 1)[0]
4✔
45
        return sequence_data.get(stripped)
4✔
NEW
46
    return None
×
47

48

49
class SequenceData(object):
4✔
50
    """
51
    Container for reference nucleotide and amino acid sequenes.
52
    """
53

54
    def __init__(self, fasta_paths, cache_directory_path=None):
4✔
55
        if type(fasta_paths) is str:
4✔
56
            fasta_paths = [fasta_paths]
×
57

58
        self.fasta_paths = [abspath(path) for path in fasta_paths]
4✔
59
        self.fasta_directory_paths = [split(path)[0] for path in self.fasta_paths]
4✔
60
        self.fasta_filenames = [split(path)[1] for path in self.fasta_paths]
4✔
61
        if cache_directory_path:
4✔
62
            self.cache_directory_paths = [cache_directory_path] * len(self.fasta_paths)
4✔
63
        else:
64
            self.cache_directory_paths = self.fasta_directory_paths
×
65
        for path in self.fasta_paths:
4✔
66
            if not exists(path):
4✔
67
                raise ValueError("Couldn't find FASTA file %s" % (path,))
×
68
        self.fasta_dictionary_filenames = [
4✔
69
            filename + ".pickle" for filename in self.fasta_filenames
70
        ]
71
        self.fasta_dictionary_pickle_paths = [
4✔
72
            join(cache_path, filename)
73
            for cache_path, filename in zip(
74
                self.cache_directory_paths, self.fasta_dictionary_filenames
75
            )
76
        ]
77
        self._init_lazy_fields()
4✔
78

79
    def _init_lazy_fields(self):
4✔
80
        self._fasta_dictionary = None
4✔
81
        self._fasta_keys = None
4✔
82

83
    def clear_cache(self):
4✔
84
        self._init_lazy_fields()
4✔
85
        for path in self.fasta_dictionary_pickle_paths:
4✔
86
            if exists(path):
4✔
87
                remove(path)
4✔
88

89
    def __str__(self):
4✔
90
        return "SequenceData(fasta_paths=%s)" % (self.fasta_paths,)
×
91

92
    def __repr__(self):
4✔
93
        return str(self)
×
94

95
    def __contains__(self, sequence_id):
4✔
96
        if self._fasta_keys is None:
×
97
            self._fasta_keys = set(self.fasta_dictionary.keys())
×
98
        return sequence_id in self._fasta_keys
×
99

100
    def __eq__(self, other):
4✔
101
        # test to see if self.fasta_paths and other.fasta_paths contain
102
        # the same list of paths, regardless of order
103
        return (other.__class__ is SequenceData) and Counter(
×
104
            self.fasta_paths
105
        ) == Counter(other.fasta_paths)
106

107
    def __hash__(self):
4✔
108
        return hash(self.fasta_paths)
×
109

110
    def _add_to_fasta_dictionary(self, fasta_dictionary_tmp):
4✔
111
        for identifier, sequence in fasta_dictionary_tmp.items():
4✔
112
            if identifier in self._fasta_dictionary:
4✔
113
                logger.warn(
×
114
                    "Sequence identifier %s is duplicated in your FASTA files!"
115
                    % identifier
116
                )
117
                continue
×
118
            self._fasta_dictionary[identifier] = sequence
4✔
119

120
    def _load_or_create_fasta_dictionary_pickle(self):
4✔
121
        self._fasta_dictionary = dict()
4✔
122
        for fasta_path, pickle_path in zip(
4✔
123
            self.fasta_paths, self.fasta_dictionary_pickle_paths
124
        ):
125
            if exists(pickle_path):
4✔
126
                # try loading the cached file
127
                # but we'll fall back on recreating it if loading fails
128
                try:
4✔
129
                    fasta_dictionary_tmp = load_pickle(pickle_path)
4✔
130
                    self._add_to_fasta_dictionary(fasta_dictionary_tmp)
4✔
131
                    logger.info("Loaded sequence dictionary from %s", pickle_path)
4✔
132
                    continue
4✔
133
                except (pickle.UnpicklingError, AttributeError):
×
134
                    # catch either an UnpicklingError or an AttributeError
135
                    # resulting from pickled objects refering to classes
136
                    # that no longer exists
137
                    logger.warn(
×
138
                        "Failed to load %s, attempting to read FASTA directly",
139
                        pickle_path,
140
                    )
141
            logger.info("Parsing sequences from FASTA file at %s", fasta_path)
4✔
142

143
            fasta_dictionary_tmp = parse_fasta_dictionary(fasta_path)
4✔
144
            self._add_to_fasta_dictionary(fasta_dictionary_tmp)
4✔
145
            logger.info("Saving sequence dictionary to %s", pickle_path)
4✔
146
            datacache.ensure_dir(dirname(pickle_path))
4✔
147
            dump_pickle(fasta_dictionary_tmp, pickle_path)
4✔
148

149
    def index(self, overwrite=False):
4✔
150
        if overwrite:
4✔
151
            self.clear_cache()
×
152
        self._load_or_create_fasta_dictionary_pickle()
4✔
153

154
    @property
4✔
155
    def fasta_dictionary(self):
4✔
156
        if not self._fasta_dictionary:
4✔
157
            self._load_or_create_fasta_dictionary_pickle()
4✔
158
        return self._fasta_dictionary
4✔
159

160
    def get(self, sequence_id):
4✔
161
        """Get sequence associated with given ID or return None if missing"""
162
        return self.fasta_dictionary.get(sequence_id)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc