• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rero / sonar / 17121500737

21 Aug 2025 08:28AM UTC coverage: 96.245% (-0.1%) from 96.376%
17121500737

push

github

Garfield-fr
feat(document): add latex transformation

* Closes #875.

Co-Authored-by: Bertrand Zuchuat <bertrand.zuchuat@rero.ch>

7945 of 8255 relevant lines covered (96.24%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.07
/sonar/modules/cli/utils.py
1
# -*- coding: utf-8 -*-
2
#
3
# Swiss Open Access Repository
4
# Copyright (C) 2021 RERO
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as published by
8
# the Free Software Foundation, version 3 of the License.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17

18
"""Utils commands."""
19

20
import json
1✔
21
import pathlib
1✔
22
import shutil
1✔
23
import sys
1✔
24
from os.path import dirname, join
1✔
25

26
import click
1✔
27
from flask import current_app
1✔
28
from flask.cli import with_appcontext
1✔
29
from invenio_db import db
1✔
30
from invenio_files_rest.models import Location
1✔
31
from invenio_oauth2server.cli import process_scopes, process_user
1✔
32
from invenio_oauth2server.models import Client, Token
1✔
33
from invenio_records_rest.utils import obj_or_import_string
1✔
34
from invenio_search.cli import search_version_check
1✔
35
from invenio_search.proxies import current_search
1✔
36
from werkzeug.local import LocalProxy
1✔
37
from werkzeug.security import gen_salt
1✔
38

39
from sonar.modules.api import SonarRecord
1✔
40

41
_datastore = LocalProxy(lambda: current_app.extensions["security"].datastore)
1✔
42

43

44
@click.group()
1✔
45
def utils():
1✔
46
    """Utils commands."""
47

48

49
@utils.command()
1✔
50
@click.option("--force", is_flag=True, default=False)
1✔
51
@with_appcontext
1✔
52
@search_version_check
1✔
53
def es_init(force):
1✔
54
    """Initialize registered templates, aliases and mappings."""
55
    # TODO: to remove once it is fixed in invenio-search module
56
    click.secho("Putting templates...", fg="green", bold=True, file=sys.stderr)
1✔
57
    with click.progressbar(
1✔
58
        current_search.put_templates(ignore=[400] if force else None),
59
        length=len(current_search.templates),
60
    ) as item:
61
        for response in item:
1✔
62
            item.label = response
1✔
63
    click.secho("Creating indexes...", fg="green", bold=True, file=sys.stderr)
1✔
64
    with click.progressbar(
1✔
65
        current_search.create(ignore=[400] if force else None),
66
        length=len(current_search.mappings),
67
    ) as item:
68
        for name, response in item:
1✔
69
            item.label = name
1✔
70

71

72
@utils.command()
1✔
73
@with_appcontext
1✔
74
def clear_files():
1✔
75
    """Remove all files and delete directory from all locations."""
76
    for location in Location.query.all():
1✔
77
        try:
1✔
78
            shutil.rmtree(location.uri)
1✔
79
        except Exception:
1✔
80
            click.secho(f"Directory {location.uri} cannot be cleaned", fg="yellow")
1✔
81

82
    click.secho("Finished", fg="green")
1✔
83

84

85
@utils.command("export")
1✔
86
@click.option("-p", "--pid-type", "pid_type", default="doc")
1✔
87
@click.option("-s", "--serializer", "serializer_key", default="export")
1✔
88
@click.option("-o", "--output-dir", "output_dir", required=True, type=click.File("w"))
1✔
89
@with_appcontext
1✔
90
def export(pid_type, serializer_key, output_dir):
1✔
91
    """Export records for the given record type.
92

93
    :param pid_type: record type
94
    :param output_dir: Output directory
95
    """
96
    click.secho(f'Export "{pid_type}" records in {output_dir.name}')
1✔
97

98
    try:
1✔
99
        # Get the correct record class
100
        record_class = SonarRecord.get_record_class_by_pid_type(pid_type)
1✔
101

102
        if not record_class:
1✔
103
            raise Exception(f'No record class found for type "{pid_type}"')
1✔
104

105
        # Load the serializer
106
        serializer_class = current_app.config.get(
1✔
107
            "SONAR_APP_EXPORT_SERIALIZERS", {}
108
        ).get(pid_type)
109

110
        if serializer_class:
1✔
111
            serializer = obj_or_import_string(serializer_class)()
1✔
112
        else:
113
            serializer = None
1✔
114

115
        pids = record_class.get_all_pids()
1✔
116
        records = []
1✔
117

118
        # Create ouptut directory if not exists
119
        if pids:
1✔
120
            pathlib.Path(output_dir.name).mkdir(mode=0o755, parents=True, exist_ok=True)
1✔
121

122
        for pid in pids:
1✔
123
            record = record_class.get_record_by_pid(pid)
1✔
124

125
            if serializer:
1✔
126
                record = serializer.dump(record)
1✔
127
            else:
128
                record = record.dumps()
1✔
129

130
            for file in record.get("files", []):
1✔
131
                if file.get("uri"):
1✔
132
                    target_path = join(output_dir.name, pid, file["key"])
1✔
133
                    pathlib.Path(dirname(target_path)).mkdir(
1✔
134
                        mode=0o755, parents=True, exist_ok=True
135
                    )
136
                    shutil.copyfile(file["uri"], target_path)
1✔
137
                    file.pop("uri")
1✔
138
                    file["path"] = f'./{pid}/{file["key"]}'
1✔
139

140
            records.append(record)
1✔
141

142
        if records:
1✔
143
            # Write data
144
            output_file = join(output_dir.name, "data.json")
1✔
145
            f = open(output_file, "w")
1✔
146
            f.write(json.dumps(records))
1✔
147
            f.close()
1✔
148

149
        click.secho("Finished", fg="green")
1✔
150

151
    except Exception as err:
1✔
152
        click.secho(f"An error occured during export: {err}", fg="red")
1✔
153

154

155
def create_personal(name, user_id, scopes=None, is_internal=False, access_token=None):
1✔
156
    """Create a personal access token.
157

158
    A token that is bound to a specific user and which doesn't expire, i.e.
159
    similar to the concept of an API key.
160

161
    :param name: Client name.
162
    :param user_id: User ID.
163
    :param scopes: The list of permitted scopes. (Default: ``None``)
164
    :param is_internal: If ``True`` it's a internal access token.
165
            (Default: ``False``)
166
    :param access_token: personalized access_token.
167
    :returns: A new access token.
168
    """
169
    with db.session.begin_nested():
1✔
170
        scopes = " ".join(scopes) if scopes else ""
1✔
171

172
        client = Client(
1✔
173
            name=name,
174
            user_id=user_id,
175
            is_internal=True,
176
            is_confidential=False,
177
            _default_scopes=scopes,
178
        )
179
        client.gen_salt()
1✔
180

181
        if not access_token:
1✔
182
            access_token = gen_salt(
×
183
                current_app.config.get("OAUTH2SERVER_TOKEN_PERSONAL_SALT_LEN")
184
            )
185
        token = Token(
1✔
186
            client_id=client.client_id,
187
            user_id=user_id,
188
            access_token=access_token,
189
            expires=None,
190
            _scopes=scopes,
191
            is_personal=True,
192
            is_internal=is_internal,
193
        )
194
        db.session.add(client)
1✔
195
        db.session.add(token)
1✔
196

197
    return token
1✔
198

199

200
@utils.command()
1✔
201
@click.option("-n", "--name", required=True)
1✔
202
@click.option(
1✔
203
    "-u", "--user", required=True, callback=process_user, help="User ID or email."
204
)
205
@click.option("-s", "--scope", "scopes", multiple=True, callback=process_scopes)
1✔
206
@click.option("-i", "--internal", is_flag=True)
1✔
207
@click.option(
1✔
208
    "-t",
209
    "--access_token",
210
    "access_token",
211
    required=False,
212
    help="personalized access_token.",
213
)
214
@with_appcontext
1✔
215
def token_create(name, user, scopes, internal, access_token):
1✔
216
    """Create a personal OAuth token."""
217
    if user:
1✔
218
        token = create_personal(
1✔
219
            name,
220
            user.id,
221
            scopes=scopes,
222
            is_internal=internal,
223
            access_token=access_token,
224
        )
225
        db.session.commit()
1✔
226
        click.secho(token.access_token, fg="blue")
1✔
227
    else:
228
        click.secho("No user found", fg="red")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc