• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Clinical-Genomics / microSALT / #327

01 Apr 2025 09:36AM UTC coverage: 31.562% (+0.04%) from 31.526%
#327

Pull #204

travis-ci

web-flow
Apply suggestions from code review
Pull Request #204: microSALT - v4.2.0

30 of 140 new or added lines in 7 files covered. (21.43%)

18 existing lines in 5 files now uncovered.

956 of 3029 relevant lines covered (31.56%)

0.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.18
/microSALT/utils/pubmlst/authentication.py
1
import json
1✔
2
import os
1✔
3
from datetime import datetime, timedelta
1✔
4

5
from dateutil import parser
1✔
6
from rauth import OAuth1Session
1✔
7

8
from microSALT import logger
1✔
9
from microSALT.utils.pubmlst.exceptions import (
1✔
10
    PUBMLSTError,
11
    SessionTokenRequestError,
12
    SessionTokenResponseError,
13
)
14
from microSALT.utils.pubmlst.helpers import (
1✔
15
    BASE_API,
16
    credentials_path_key,
17
    folders_config,
18
    get_path,
19
    load_auth_credentials,
20
    pubmlst_session_credentials_file_name,
21
    save_session_token,
22
)
23

24
session_token_validity = 12  # 12-hour validity
1✔
25
session_expiration_buffer = 60  # 60-second buffer
1✔
26

27

28
def get_new_session_token(db: str):
1✔
29
    """Request a new session token using all credentials for a specific database."""
NEW
30
    logger.debug(f"Fetching a new session token for database '{db}'...")
×
31

32
    try:
×
33
        consumer_key, consumer_secret, access_token, access_secret = load_auth_credentials()
×
34

35
        url = f"{BASE_API}/db/{db}/oauth/get_session_token"
×
36

37
        session = OAuth1Session(
×
38
            consumer_key=consumer_key,
39
            consumer_secret=consumer_secret,
40
            access_token=access_token,
41
            access_token_secret=access_secret,
42
        )
43

NEW
44
        response = session.get(url, headers={"User-Agent": "BIGSdb API downloader"})
×
NEW
45
        logger.debug(f"Response Status Code: {response.status_code}")
×
46

47
        if response.ok:
×
48
            try:
×
49
                token_data = response.json()
×
50
                session_token = token_data.get("oauth_token")
×
51
                session_secret = token_data.get("oauth_token_secret")
×
52

53
                if not session_token or not session_secret:
×
54
                    raise SessionTokenResponseError(
×
55
                        db, "Missing 'oauth_token' or 'oauth_token_secret' in response."
56
                    )
57

58
                expiration_time = datetime.now() + timedelta(hours=session_token_validity)
×
59

60
                save_session_token(db, session_token, session_secret, expiration_time)
×
61
                return session_token, session_secret
×
62

63
            except (ValueError, KeyError) as e:
×
64
                raise SessionTokenResponseError(db, f"Invalid response format: {str(e)}")
×
65
        else:
NEW
66
            raise SessionTokenRequestError(db, response.status_code, response.text)
×
67

68
    except PUBMLSTError as e:
×
69
        logger.error(f"Error during token fetching: {e}")
×
70
        raise
×
71
    except Exception as e:
×
72
        logger.error(f"Unexpected error: {e}")
×
NEW
73
        raise PUBMLSTError(
×
74
            f"Unexpected error while fetching session token for database '{db}': {e}"
75
        )
76

77

78
def load_session_credentials(db: str):
1✔
79
    """Load session token from file for a specific database."""
80
    try:
×
81
        credentials_file = os.path.join(
×
82
            get_path(folders_config, credentials_path_key), pubmlst_session_credentials_file_name
83
        )
84

85
        if not os.path.exists(credentials_file):
×
86
            logger.debug("Session file does not exist. Fetching a new session token.")
×
87
            return get_new_session_token(db)
×
88

89
        with open(credentials_file, "r") as f:
×
90
            try:
×
91
                all_sessions = json.load(f)
×
92
            except json.JSONDecodeError as e:
×
93
                raise SessionTokenResponseError(db, f"Failed to parse session file: {str(e)}")
×
94

95
        db_session_data = all_sessions.get("databases", {}).get(db)
×
96
        if not db_session_data:
×
NEW
97
            logger.debug(
×
98
                f"No session token found for database '{db}'. Fetching a new session token."
99
            )
UNCOV
100
            return get_new_session_token(db)
×
101

102
        expiration = parser.parse(db_session_data.get("expiration", ""))
×
103
        if datetime.now() < expiration - timedelta(seconds=session_expiration_buffer):
×
104
            logger.debug(f"Using existing session token for database '{db}'.")
×
105
            session_token = db_session_data.get("token")
×
106
            session_secret = db_session_data.get("secret")
×
107

108
            return session_token, session_secret
×
109

NEW
110
        logger.debug(
×
111
            f"Session token for database '{db}' has expired. Fetching a new session token."
112
        )
UNCOV
113
        return get_new_session_token(db)
×
114

115
    except PUBMLSTError as e:
×
116
        logger.error(f"PUBMLST-specific error occurred: {e}")
×
117
        raise
×
118
    except Exception as e:
×
119
        logger.error(f"Unexpected error: {e}")
×
120
        raise PUBMLSTError(f"Unexpected error while loading session token for database '{db}': {e}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc