• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

askomics / flaskomics / 4132887853

pending completion
4132887853

push

github-actions

GitHub
Add 'anonymous_query' mode (#384)

357 of 357 new or added lines in 16 files covered. (100.0%)

6166 of 7374 relevant lines covered (83.62%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/askomics/tasks.py
1
"""Async task
2

3
Attributes
4
----------
5
app : Flask
6
    Flask app
7
celery : Celery
8
    Celery object
9
"""
10
import sys
×
11
import traceback
×
12

13
from askomics.app import create_app, create_celery
×
14
from askomics.libaskomics.Dataset import Dataset
×
15
from askomics.libaskomics.DatasetsHandler import DatasetsHandler
×
16
from askomics.libaskomics.FilesHandler import FilesHandler
×
17
from askomics.libaskomics.LocalAuth import LocalAuth
×
18
from askomics.libaskomics.Result import Result
×
19
from askomics.libaskomics.ResultsHandler import ResultsHandler
×
20
from askomics.libaskomics.SparqlQuery import SparqlQuery
×
21
from askomics.libaskomics.SparqlQueryLauncher import SparqlQueryLauncher
×
22

23
from celery.schedules import crontab
×
24

25
app = create_app(config='config/askomics.ini')
×
26
celery = create_celery(app)
×
27

28

29
@celery.task(bind=True, name="integrate")
×
30
def integrate(self, session, data, host_url):
×
31
    """Integrate a file into the triplestore
32

33
    Parameters
34
    ----------
35
    session : dict
36
        AskOmics session
37
    data : dict
38
        fileId: file to integrate
39
        public: integrate as public or private data
40
    host_url : string
41
        AskOmics host url
42

43
    Returns
44
    -------
45
    dict
46
        error: True if error, else False
47
        errorMessage: the error message of error, else an empty string
48
    """
49
    files_handler = FilesHandler(app, session, host_url=host_url, external_endpoint=data["externalEndpoint"], custom_uri=data["customUri"], external_graph=data['externalGraph'])
×
50
    files_handler.handle_files([data["fileId"], ])
×
51

52
    public = (data.get("public", False) if session["user"]["admin"] else False) or app.iniconfig.getboolean("askomics", "single_tenant", fallback=False)
×
53

54
    for file in files_handler.files:
×
55

56
        try:
×
57

58
            dataset_info = {
×
59
                "celery_id": self.request.id,
60
                "id": data["dataset_id"],
61
                "file_id": file.id,
62
                "name": file.human_name,
63
                "graph_name": file.file_graph,
64
                "public": public
65
            }
66

67
            dataset = Dataset(app, session, dataset_info)
×
68
            dataset.update_in_db("started", update_date=True, update_graph=True)
×
69

70
            if file.type == "csv/tsv":
×
71
                file.integrate(data["dataset_id"], data.get('columns_type'), data.get('header_names'), public=public)
×
72
            elif file.type == "gff/gff3":
×
73
                file.integrate(data["dataset_id"], data.get("entities"), public=public)
×
74
            elif file.type in ('rdf/ttl', 'rdf/xml', 'rdf/nt'):
×
75
                file.integrate(public=public)
×
76
            elif file.type == "bed":
×
77
                file.integrate(data["dataset_id"], data.get("entity_name"), public=public)
×
78
            # done
79
            dataset.update_in_db("success", ntriples=file.ntriples)
×
80
        except Exception as e:
×
81
            traceback.print_exc(file=sys.stdout)
×
82
            trace = traceback.format_exc()
×
83
            dataset.update_in_db("failure", error=True, error_message=str(e), traceback=trace)
×
84
            # Rollback
85
            file.rollback()
×
86
            raise e
×
87
            return {
×
88
                'error': True,
89
                'errorMessage': str(e)
90
            }
91

92
    return {
×
93
        'error': False,
94
        'errorMessage': ''
95
    }
96

97

98
@celery.task(bind=True, name='delete_datasets')
×
99
def delete_datasets(self, session, datasets_info, admin=False):
×
100
    """Delete datasets from database and triplestore
101

102
    Parameters
103
    ----------
104
    session : dict
105
        AskOmics session
106
    datasets_info : list of dict
107
        Ids of datasets to delete
108

109
    Returns
110
    -------
111
    dict
112
        error: True if error, else False
113
        errorMessage: the error message of error, else an empty string
114
    """
115
    try:
×
116
        datasets_handler = DatasetsHandler(app, session, datasets_info=datasets_info)
×
117
        datasets_handler.handle_datasets(admin=admin)
×
118
        datasets_handler.update_status_in_db("deleting", admin=admin)
×
119
        datasets_handler.delete_datasets(admin=admin)
×
120

121
    except Exception as e:
×
122
        traceback.print_exc(file=sys.stdout)
×
123
        raise e
×
124
        return {
×
125
            'error': True,
126
            'errorMessage': str(e)
127
        }
128

129
    return {
×
130
        'error': False,
131
        'errorMessage': ''
132
    }
133

134

135
@celery.task(bind=True, name="query")
×
136
def query(self, session, info):
×
137
    """Save the query results in filesystem and db
138

139
    Parameters
140
    ----------
141
    session : dict
142
        AskOmics session
143
    graph_state : dict
144
        JSON graph state
145

146
    Returns
147
    -------
148
    dict
149
        error: True if error, else False
150
        errorMessage: the error message of error, else an empty string
151
    """
152
    try:
×
153
        info["celery_id"] = self.request.id
×
154
        result = Result(app, session, info, force_no_db=True)
×
155

156
        query = SparqlQuery(app, session, info["graph_state"])
×
157
        query.build_query_from_json(preview=False, for_editor=False)
×
158
        federated = query.is_federated()
×
159
        result.populate_db(query.graphs, query.endpoints)
×
160

161
        info["query"] = query.sparql
×
162
        info["graphs"] = query.graphs
×
163
        info["endpoints"] = query.endpoints
×
164
        info["federated"] = federated
×
165
        info["selects"] = query.selects
×
166

167
        # Save job in database database
168
        result.set_celery_id(self.request.id)
×
169
        result.update_db_status("started", update_celery=True, update_date=True)
×
170

171
        # launch query
172

173
        headers = info["selects"]
×
174
        results = []
×
175
        if info["graphs"] or app.iniconfig.getboolean("askomics", "single_tenant", fallback=False):
×
176
            query_launcher = SparqlQueryLauncher(app, session, get_result_query=True, federated=info["federated"], endpoints=info["endpoints"])
×
177
            headers, results = query_launcher.process_query(info["query"], isql_api=True)
×
178

179
        # write result to a file
180
        file_size = result.save_result_in_file(headers, results)
×
181

182
        # Update database status
183
        result.update_db_status("success", size=file_size)
×
184

185
    except Exception as e:
×
186
        traceback.print_exc(file=sys.stdout)
×
187
        trace = traceback.format_exc()
×
188
        result.update_db_status("error", error=True, error_message=str(e), traceback=trace)
×
189
        result.rollback()
×
190
        raise e
×
191
        return {
×
192
            'error': True,
193
            'errorMessage': str(e)
194
        }
195
    return {
×
196
        'error': False,
197
        'errorMessage': ''
198
    }
199

200

201
@celery.task(bind=True, name="sparql_query")
×
202
def sparql_query(self, session, info):
×
203
    """Save the sparql query results in filesystem and db
204

205
    Parameters
206
    ----------
207
    session : dict
208
        AskOmics session
209
    info : dict
210
        sparql query
211

212
    Returns
213
    -------
214
    dict
215
        error: True if error, else False
216
        errorMessage: the error message of error, else an empty string
217
    """
218
    try:
×
219
        info["celery"] = self.request.id
×
220
        result = Result(app, session, info, force_no_db=True)
×
221

222
        # Save job in db
223
        result.set_celery_id(self.request.id)
×
224
        result.update_db_status("started", update_celery=True)
×
225

226
        query_launcher = SparqlQueryLauncher(app, session, get_result_query=True, federated=info["federated"], endpoints=info["endpoints"])
×
227
        header, data = query_launcher.process_query(info["sparql_query"], isql_api=True)
×
228

229
        # Write results in file
230
        file_size = result.save_result_in_file(header, data)
×
231

232
        # Update database status
233
        result.update_db_status("success", size=file_size)
×
234

235
    except Exception as e:
×
236
        traceback.print_exc(file=sys.stdout)
×
237
        trace = traceback.format_exc()
×
238
        result.update_db_status("error", error=True, error_message=str(e), traceback=trace)
×
239
        raise e
×
240
        return {
×
241
            'error': True,
242
            'errorMessage': str(e)
243
        }
244
    return {
×
245
        'error': False,
246
        'errorMessage': ''
247
    }
248

249

250
@celery.task(bind=True, name="delete_users_data")
×
251
def delete_users_data(self, session, users, delete_user):
×
252
    """Delete users directory and RDF data
253

254
    Parameters
255
    ----------
256
    session : dict
257
        AskOmics session
258
    users : list
259
        list of user to delete
260
    delete_user : boolean
261
        True if delete all user or juste his data
262

263
    Returns
264
    -------
265
    dict
266
        error: True if error, else False
267
        errorMessage: the error message of error, else an empty string
268
    """
269
    try:
×
270
        local_auth = LocalAuth(app, session)
×
271

272
        for user in users:
×
273
            local_auth.delete_user_directory(user, delete_user)
×
274
            local_auth.delete_user_rdf(user["username"])
×
275

276
    except Exception as e:
×
277
        traceback.print_exc(file=sys.stdout)
×
278
        return {
×
279
            'error': True,
280
            'errorMessage': str(e)
281
        }
282
    return {
×
283
        'error': False,
284
        'errorMessage': ''
285
    }
286

287

288
@celery.task(bind=True, name="send_mail_new_user")
×
289
def send_mail_new_user(self, session, user):
×
290
    """Send a mail to the new user
291

292
    Parameters
293
    ----------
294
    session : dict
295
        AskOmics session
296
    user : dict
297
        New user
298
    """
299
    local_auth = LocalAuth(app, session)
×
300
    local_auth.send_mail_to_new_user(user)
×
301

302

303
@celery.task(bind=True, name="download_file")
×
304
def download_file(self, session, url):
×
305
    """Send a mail to the new user
306

307
    Parameters
308
    ----------
309
    session : dict
310
        AskOmics session
311
    user : dict
312
        New user
313
    """
314
    files = FilesHandler(app, session)
×
315
    files.download_url(url, download_file.request.id)
×
316

317

318
@celery.on_after_configure.connect
×
319
def cron_cleanup(sender, **kwargs):
×
320
    # If anonymous queries are allowed, add a 'cleanup' task, which run every hour
321
    if app.iniconfig.getboolean('askomics', 'anonymous_query', fallback=False):
×
322
        sender.add_periodic_task(
×
323
            crontab(hour='*/2'),
324
            cleanup_anonymous_data.s(),
325
        )
326

327

328
@celery.task(bind=True, name="cleanup_anonymous")
×
329
def cleanup_anonymous_data(self):
×
330
    periodicity = app.iniconfig.getint('askomics', 'anonymous_query_cleanup', fallback=60)
×
331
    handler = ResultsHandler(app, {})
×
332
    # Cleanup jobs older than 'periodicity'
333
    handler.delete_older_results(periodicity, "day", "0")
×
334
    # Cleanup failed jobs older than 1 hour
335
    handler.delete_older_results(1, "hour", "0", "error")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc