• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HEPData / hepdata-cli / 17156633267

22 Aug 2025 01:29PM UTC coverage: 97.143% (+0.03%) from 97.115%
17156633267

Pull #7

github

web-flow
Merge b6717acc7 into 799b6ea14
Pull Request #7: Add support for yoda-H5

7 of 7 new or added lines in 3 files covered. (100.0%)

1 existing line in 1 file now uncovered.

204 of 210 relevant lines covered (97.14%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.58
/hepdata_cli/api.py
1
# -*- coding: utf-8 -*-
2

3
from .version import __version__
1✔
4
from .resilient_requests import resilient_requests
1✔
5

6
import tarfile
1✔
7
import sys
1✔
8
import re
1✔
9
import os
1✔
10
import errno
1✔
11

12
SITE_URL = "https://www.hepdata.net"
1✔
13
# SITE_URL = "http://127.0.0.1:5000"
14

15
UPLOAD_MAX_SIZE = 52000000  # Upload limit in bytes
1✔
16
ALLOWED_FORMATS = ['csv', 'root', 'yaml', 'yoda', 'yoda1', 'yoda.h5', 'json']
1✔
17

18
MAX_MATCHES, MATCHES_PER_PAGE = (10000, 10) if "pytest" not in sys.modules else (144, 12)
1✔
19

20

21
class Client(object):
1✔
22
    """API class to handle all queries to HEPData."""
23

24
    def __init__(self, verbose=False):
1✔
25
        """
26
        Initialises the client object.
27

28
        :param verbose: prints additional output.
29
        """
30
        self.verbose = verbose
1✔
31
        self.version = __version__
1✔
32
        # check service availability
33
        resilient_requests('get', SITE_URL + '/ping')
1✔
34

35
    def find(self, query, keyword=None, ids=None, max_matches=MAX_MATCHES, matches_per_page=MATCHES_PER_PAGE):
1✔
36
        """
37
        Search function for the hepdata database. Calls hepdata.net search function.
38

39
        :param query: string passed to hepdata.net search function. See advanced search tips at hepdata.net.
40
        :param keyword: filters return dictionary for given keyword. Exact match is first attempted, otherwise partial match is accepted.
41
        :param ids: accepts one of ("arxiv", "inspire", "hepdata").
42

43
        :return: returns a list of (filtered if 'keyword' is specified) dictionaries for the search matches. If 'ids' is specified it instead returns a list of ids as a string.
44
        """
45
        find_results = []
1✔
46
        for counter in range(int(max_matches / matches_per_page)):
1✔
47
            counter += 1
1✔
48
            response = self._query(query, page=counter, size=matches_per_page)
1✔
49
            data = response.json()
1✔
50
            if len(data['results']) == 0:
1✔
51
                break
1✔
52
            elif keyword is None and ids is None:
1✔
53
                # return full list of dictionary
54
                find_results += data['results']
1✔
55
            else:
56
                assert ids in [None, "arxiv", "inspire", "hepdata", "id"], "allowd ids are: arxiv, inspire and hepdata"
1✔
57
                if ids is not None:
1✔
58
                    if ids == "hepdata":
1✔
59
                        ids = "id"
1✔
60
                    keyword = ids
1✔
61
                # return specific dictionary entry (exact match)
62
                if any([keyword in result.keys() for result in data['results']]):
1✔
63
                    if ids is None:
1✔
64
                        find_results += [{keyword: result[keyword]} for result in data['results'] if keyword in result.keys()]
1✔
65
                    else:
66
                        find_results += [str(result[keyword]).replace("arXiv:", "") for result in data['results'] if keyword in result.keys()]
1✔
67
                # return specific dictionary entry (partial match)
68
                elif any([any([keyword in key for key in result.keys()]) for result in data['results']]):
1✔
69
                    if ids is None:
1✔
70
                        find_results += [{key: result[key] for key in result.keys() if keyword in key} for result in data['results']]
1✔
71
                    else:
72
                        find_results += [[str(result[key]).replace("arXiv:", "") for key in result.keys() if keyword in key][0]
1✔
73
                                         if len([result[key] for key in result.keys() if keyword in key]) > 0 else "" for result in data['results']]
74
            if len(data['results']) < matches_per_page:
1✔
75
                break
1✔
76
        if ids is None:
1✔
77
            return find_results
1✔
78
        else:
79
            return ' '.join(find_results)
1✔
80

81
    def download(self, id_list, file_format=None, ids=None, table_name='', download_dir='./hepdata-downloads'):
1✔
82
        """
83
        Downloads from the hepdata database the specified records.
84

85
        :param id_list: list of ids to download. These can be obtained by the find function.
86
        :param file_format: accepts one of ('csv', 'root', 'yaml', 'yoda', 'yoda1', 'yoda.h5', 'json'). Specifies the download file format.
87
        :param ids: accepts one of ('inspire', 'hepdata'). It specifies what type of ids have been passed.
88
        :param table_name: restricts download to specific tables.
89
        :param download_dir: defaults to ./hepdata-downloads. Specifies where to download the files.
90
        """
91

92
        urls = self._build_urls(id_list, file_format, ids, table_name)
1✔
93
        for url in urls:
1✔
94
            if self.verbose is True:
1✔
95
                print("Downloading: " + url)
1✔
96
            download_url(url, download_dir)
1✔
97

98
    def fetch_names(self, id_list, ids=None):
1✔
99
        """
100
        Returns the names of the tables in the provided records. These are the possible inputs of table_name parameter in download function.
101

102
        :param id_list: list of id of records of which to return table names.
103
        :param ids: accepts one of ('inspire', 'hepdata'). It specifies what type of ids have been passed.
104
        """
105
        urls = self._build_urls(id_list, 'json', ids, '')
1✔
106
        table_names = []
1✔
107
        for url in urls:
1✔
108
            response = resilient_requests('get', url)
1✔
109
            json_dict = response.json()
1✔
110
            table_names += [[data_table['name'] for data_table in json_dict['data_tables']]]
1✔
111
        return table_names
1✔
112

113
    def upload(self, path_to_file, email, recid=None, invitation_cookie=None, sandbox=True, password=None):
1✔
114
        """
115
        Upload record.
116

117
        :param path_to_file: path of file to be uploaded.
118
        :param email: email address of existing HEPData user.
119
        :recid: HEPData ID (not the INSPIRE ID) of an existing record.
120
        :invitation_cookie: token sent in the invitation email for a non-sandbox record.
121
        :sandbox: True (default) or False if the file should be uploaded to the sandbox.
122
        :password: password of existing HEPData user (prompt if not specified).
123
        """
124
        file_size = os.path.getsize(path_to_file)
1✔
125
        assert file_size < UPLOAD_MAX_SIZE,\
1✔
126
            '{} too large ({} bytes > {} bytes)'.format(path_to_file, file_size, UPLOAD_MAX_SIZE)
127
        files = {'hep_archive': open(path_to_file, 'rb')}
1✔
128
        data = {'email': email, 'recid': recid, 'invitation_cookie': invitation_cookie, 'sandbox': sandbox, 'pswd': password}
1✔
129
        resilient_requests('post', SITE_URL + '/record/cli_upload', data=data, files=files)
1✔
130
        # print upload location
131
        if sandbox is True and recid is None:
1✔
132
            print('Uploaded ' + path_to_file + ' to a new record at ' + SITE_URL + '/record/sandbox')
1✔
133
        elif sandbox is True and recid is not None:
1✔
134
            print('Uploaded ' + path_to_file + ' to ' + SITE_URL + '/record/sandbox/' + str(recid))
1✔
135
        else:
136
            print('Uploaded ' + path_to_file + ' to ' + SITE_URL + '/record/' + str(recid))
1✔
137

138
    def _build_urls(self, id_list, file_format, ids, table_name):
1✔
139
        """Builds urls for download and fetch_names, given the specified parameters."""
140
        if type(id_list) not in (tuple, list):
1✔
141
            id_list = id_list.split()
1✔
142
        assert len(id_list) > 0, 'Ids are required.'
1✔
143
        assert file_format in ALLOWED_FORMATS, f"allowed formats are: {ALLOWED_FORMATS}"
1✔
144
        assert ids in ['inspire', 'hepdata'], "allowed ids are: inspire and hepdata."
1✔
145
        if table_name == '':
1✔
146
            params = {'format': file_format}
1✔
147
        else:
148
            params = {'format': file_format, 'table': table_name}
1✔
149
        urls = [resilient_requests('get', SITE_URL + '/record/' + ('ins' if ids == 'inspire' else '') + id_entry, params=params).url.replace('%2525', '%25') for id_entry in id_list]
1✔
150
        print('params = ', params)
1✔
151
        print('urls = ', urls)
1✔
152
        return urls
1✔
153

154
    def _query(self, query, page, size):
1✔
155
        """Builds the search query passed to hepdata.net."""
156
        url = SITE_URL + '/search/?q=' + query + '&format=json&page=' + str(page) + '&size=' + str(size)
1✔
157
        response = resilient_requests('get', url)
1✔
158
        if self.verbose is True:
1✔
159
            print('Looking up: ' + url)
1✔
160
        return response
1✔
161

162

163
def mkdir(directory):
1✔
164
    if not os.path.exists(directory):
1✔
165
        try:
1✔
166
            os.makedirs(directory)
1✔
167
        except OSError as exc:   # Guard against race condition (directory created between os.path.exists and os.makedirs)
×
168
            if exc.errno != errno.EEXIST:
×
UNCOV
169
                raise Exception
×
170

171

172
def download_url(url, download_dir):
1✔
173
    """Download file and if necessary extract it."""
174
    assert is_downloadable(url), "Given url is not downloadable: {}".format(url)
1✔
175
    response = resilient_requests('get', url, allow_redirects=True)
1✔
176
    if url[-4:] == 'json':
1✔
177
        filename = 'HEPData-' + url.split('/')[-1].split("?")[0] + ".json"
1✔
178
    else:
179
        filename = getFilename_fromCd(response.headers.get('content-disposition'))
1✔
180
    if filename[0] == '"' and filename[-1] == '"':
1✔
181
        filename = filename[1:-1]
1✔
182
    filepath = download_dir + "/" + filename
1✔
183
    mkdir(os.path.dirname(filepath))
1✔
184
    open(filepath, 'wb').write(response.content)
1✔
185
    if filepath.endswith("tar.gz") or filepath.endswith("tar"):
1✔
186
        tar = tarfile.open(filepath, "r:gz" if filepath.endswith("tar.gz") else "r:")
1✔
187
        tar.extractall(path=os.path.dirname(filepath))
1✔
188
        tar.close()
1✔
189
        os.remove(filepath)
1✔
190

191

192
def getFilename_fromCd(cd):
1✔
193
    """Get filename from content-disposition."""
194
    if not cd:
1✔
195
        return None
1✔
196
    fname = re.findall('filename=(.+)', cd)
1✔
197
    if len(fname) == 0:
1✔
198
        return None
1✔
199
    return fname[0]
1✔
200

201

202
def is_downloadable(url):
1✔
203
    """Does the url contain a downloadable resource?"""
204
    header = resilient_requests('head', url, allow_redirects=True).headers
1✔
205
    content_type = header.get('content-type')
1✔
206
    if 'html' in content_type.lower():
1✔
207
        return False
1✔
208
    return True
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc