• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

askomics / flaskomics / 8143972779

04 Mar 2024 04:54PM UTC coverage: 83.219%. Remained the same
8143972779

Pull #449

github

web-flow
Merge 6d2bc1d30 into 91939bc97
Pull Request #449: Bump xml2js and parse-bmfont-xml

6283 of 7550 relevant lines covered (83.22%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/askomics/tasks.py
1
"""Async task
2

3
Attributes
4
----------
5
app : Flask
6
    Flask app
7
celery : Celery
8
    Celery object
9
"""
10
import sys
×
11
import traceback
×
12

13
from askomics.app import create_app, create_celery
×
14
from askomics.libaskomics.Dataset import Dataset
×
15
from askomics.libaskomics.DatasetsHandler import DatasetsHandler
×
16
from askomics.libaskomics.FilesHandler import FilesHandler
×
17
from askomics.libaskomics.LocalAuth import LocalAuth
×
18
from askomics.libaskomics.Result import Result
×
19
from askomics.libaskomics.ResultsHandler import ResultsHandler
×
20
from askomics.libaskomics.SparqlQuery import SparqlQuery
×
21
from askomics.libaskomics.SparqlQueryLauncher import SparqlQueryLauncher
×
22

23
from celery.schedules import crontab
×
24

25
app = create_app(config='config/askomics.ini')
×
26
celery = create_celery(app)
×
27

28

29
@celery.task(bind=True, name="save_preview")
×
30
def save_preview(self, session, fileId):
×
31
    """Compute the file preview in backend and store it in DB
32

33
    Parameters
34
    ----------
35
    session : dict
36
        AskOmics session
37
    fileId : string
38
        file to integrate
39
    """
40
    files_handler = FilesHandler(app, session)
×
41
    files_handler.handle_files([fileId, ])
×
42
    for file in files_handler.files:
×
43
        file.save_preview()
×
44

45
    return {
×
46
        'error': False,
47
        'errorMessage': ''
48
    }
49

50

51
@celery.task(bind=True, name="integrate")
×
52
def integrate(self, session, data, host_url):
×
53
    """Integrate a file into the triplestore
54

55
    Parameters
56
    ----------
57
    session : dict
58
        AskOmics session
59
    data : dict
60
        fileId: file to integrate
61
        public: integrate as public or private data
62
    host_url : string
63
        AskOmics host url
64

65
    Returns
66
    -------
67
    dict
68
        error: True if error, else False
69
        errorMessage: the error message of error, else an empty string
70
    """
71
    files_handler = FilesHandler(app, session, host_url=host_url, external_endpoint=data["externalEndpoint"], custom_uri=data["customUri"], external_graph=data['externalGraph'])
×
72
    files_handler.handle_files([data["fileId"], ])
×
73

74
    public = (data.get("public", False) if session["user"]["admin"] else False) or app.iniconfig.getboolean("askomics", "single_tenant", fallback=False)
×
75

76
    for file in files_handler.files:
×
77

78
        try:
×
79

80
            dataset_info = {
×
81
                "celery_id": self.request.id,
82
                "id": data["dataset_id"],
83
                "file_id": file.id,
84
                "name": file.human_name,
85
                "graph_name": file.file_graph,
86
                "public": public
87
            }
88

89
            dataset = Dataset(app, session, dataset_info)
×
90
            dataset.update_in_db("started", update_date=True, update_graph=True)
×
91

92
            if file.type == "csv/tsv":
×
93
                file.integrate(data["dataset_id"], data.get('columns_type'), data.get('header_names'), public=public)
×
94
            elif file.type == "gff/gff3":
×
95
                file.integrate(data["dataset_id"], data.get("entities"), public=public)
×
96
            elif file.type in ('rdf/ttl', 'rdf/xml', 'rdf/nt'):
×
97
                file.integrate(public=public)
×
98
            elif file.type == "bed":
×
99
                file.integrate(data["dataset_id"], data.get("entity_name"), public=public)
×
100
            # done
101
            dataset.update_in_db("success", ntriples=file.ntriples)
×
102
        except Exception as e:
×
103
            traceback.print_exc(file=sys.stdout)
×
104
            trace = traceback.format_exc()
×
105
            dataset.update_in_db("failure", error=True, error_message=str(e), traceback=trace)
×
106
            # Rollback
107
            file.rollback()
×
108
            raise e
×
109
            return {
110
                'error': True,
111
                'errorMessage': str(e)
112
            }
113

114
    return {
×
115
        'error': False,
116
        'errorMessage': ''
117
    }
118

119

120
@celery.task(bind=True, name='delete_datasets')
×
121
def delete_datasets(self, session, datasets_info, admin=False):
×
122
    """Delete datasets from database and triplestore
123

124
    Parameters
125
    ----------
126
    session : dict
127
        AskOmics session
128
    datasets_info : list of dict
129
        Ids of datasets to delete
130

131
    Returns
132
    -------
133
    dict
134
        error: True if error, else False
135
        errorMessage: the error message of error, else an empty string
136
    """
137
    try:
×
138
        datasets_handler = DatasetsHandler(app, session, datasets_info=datasets_info)
×
139
        datasets_handler.handle_datasets(admin=admin)
×
140
        datasets_handler.update_status_in_db("deleting", admin=admin)
×
141
        datasets_handler.delete_datasets(admin=admin)
×
142

143
    except Exception as e:
×
144
        traceback.print_exc(file=sys.stdout)
×
145
        raise e
×
146
        return {
147
            'error': True,
148
            'errorMessage': str(e)
149
        }
150

151
    return {
×
152
        'error': False,
153
        'errorMessage': ''
154
    }
155

156

157
@celery.task(bind=True, name="query")
×
158
def query(self, session, info):
×
159
    """Save the query results in filesystem and db
160

161
    Parameters
162
    ----------
163
    session : dict
164
        AskOmics session
165
    graph_state : dict
166
        JSON graph state
167

168
    Returns
169
    -------
170
    dict
171
        error: True if error, else False
172
        errorMessage: the error message of error, else an empty string
173
    """
174
    try:
×
175
        info["celery_id"] = self.request.id
×
176
        result = Result(app, session, info, force_no_db=True)
×
177

178
        query = SparqlQuery(app, session, info["graph_state"])
×
179
        query.build_query_from_json(preview=False, for_editor=False)
×
180
        federated = query.is_federated()
×
181
        result.populate_db(query.graphs, query.endpoints)
×
182

183
        info["query"] = query.sparql
×
184
        info["graphs"] = query.graphs
×
185
        info["endpoints"] = query.endpoints
×
186
        info["federated"] = federated
×
187
        info["selects"] = query.selects
×
188

189
        # Save job in database database
190
        result.set_celery_id(self.request.id)
×
191
        result.update_db_status("started", update_celery=True, update_date=True)
×
192

193
        # launch query
194

195
        headers = info["selects"]
×
196
        results = []
×
197
        if info["graphs"] or app.iniconfig.getboolean("askomics", "single_tenant", fallback=False):
×
198
            query_launcher = SparqlQueryLauncher(app, session, get_result_query=True, federated=info["federated"], endpoints=info["endpoints"])
×
199
            headers, results = query_launcher.process_query(info["query"], isql_api=True)
×
200

201
        # write result to a file
202
        file_size = result.save_result_in_file(headers, results)
×
203

204
        # Update database status
205
        result.update_db_status("success", size=file_size)
×
206

207
    except Exception as e:
×
208
        traceback.print_exc(file=sys.stdout)
×
209
        trace = traceback.format_exc()
×
210
        result.update_db_status("error", error=True, error_message=str(e), traceback=trace)
×
211
        result.rollback()
×
212
        raise e
×
213
        return {
214
            'error': True,
215
            'errorMessage': str(e)
216
        }
217
    return {
×
218
        'error': False,
219
        'errorMessage': ''
220
    }
221

222

223
@celery.task(bind=True, name="sparql_query")
×
224
def sparql_query(self, session, info):
×
225
    """Save the sparql query results in filesystem and db
226

227
    Parameters
228
    ----------
229
    session : dict
230
        AskOmics session
231
    info : dict
232
        sparql query
233

234
    Returns
235
    -------
236
    dict
237
        error: True if error, else False
238
        errorMessage: the error message of error, else an empty string
239
    """
240
    try:
×
241
        info["celery"] = self.request.id
×
242
        result = Result(app, session, info, force_no_db=True)
×
243

244
        # Save job in db
245
        result.set_celery_id(self.request.id)
×
246
        result.update_db_status("started", update_celery=True)
×
247

248
        query_launcher = SparqlQueryLauncher(app, session, get_result_query=True, federated=info["federated"], endpoints=info["endpoints"])
×
249
        header, data = query_launcher.process_query(info["sparql_query"], isql_api=True)
×
250

251
        # Write results in file
252
        file_size = result.save_result_in_file(header, data)
×
253

254
        # Update database status
255
        result.update_db_status("success", size=file_size)
×
256

257
    except Exception as e:
×
258
        traceback.print_exc(file=sys.stdout)
×
259
        trace = traceback.format_exc()
×
260
        result.update_db_status("error", error=True, error_message=str(e), traceback=trace)
×
261
        raise e
×
262
        return {
263
            'error': True,
264
            'errorMessage': str(e)
265
        }
266
    return {
×
267
        'error': False,
268
        'errorMessage': ''
269
    }
270

271

272
@celery.task(bind=True, name="delete_users_data")
×
273
def delete_users_data(self, session, users, delete_user):
×
274
    """Delete users directory and RDF data
275

276
    Parameters
277
    ----------
278
    session : dict
279
        AskOmics session
280
    users : list
281
        list of user to delete
282
    delete_user : boolean
283
        True if delete all user or juste his data
284

285
    Returns
286
    -------
287
    dict
288
        error: True if error, else False
289
        errorMessage: the error message of error, else an empty string
290
    """
291
    try:
×
292
        local_auth = LocalAuth(app, session)
×
293

294
        for user in users:
×
295
            local_auth.delete_user_directory(user, delete_user)
×
296
            local_auth.delete_user_rdf(user["username"])
×
297

298
    except Exception as e:
×
299
        traceback.print_exc(file=sys.stdout)
×
300
        return {
×
301
            'error': True,
302
            'errorMessage': str(e)
303
        }
304
    return {
×
305
        'error': False,
306
        'errorMessage': ''
307
    }
308

309

310
@celery.task(bind=True, name="send_mail_new_user")
×
311
def send_mail_new_user(self, session, user):
×
312
    """Send a mail to the new user
313

314
    Parameters
315
    ----------
316
    session : dict
317
        AskOmics session
318
    user : dict
319
        New user
320
    """
321
    local_auth = LocalAuth(app, session)
×
322
    local_auth.send_mail_to_new_user(user)
×
323

324

325
@celery.task(bind=True, name="download_file")
×
326
def download_file(self, session, url):
×
327
    """Send a mail to the new user
328

329
    Parameters
330
    ----------
331
    session : dict
332
        AskOmics session
333
    user : dict
334
        New user
335
    """
336
    files = FilesHandler(app, session)
×
337
    files.download_url(url, download_file.request.id)
×
338

339

340
@celery.on_after_configure.connect
×
341
def cron_cleanup(sender, **kwargs):
×
342
    # If anonymous queries are allowed, add a 'cleanup' task, which run every hour
343
    if app.iniconfig.getboolean('askomics', 'anonymous_query', fallback=False):
×
344
        sender.add_periodic_task(
×
345
            crontab(hour='*/2'),
346
            cleanup_anonymous_data.s(),
347
        )
348

349

350
@celery.task(bind=True, name="cleanup_anonymous")
×
351
def cleanup_anonymous_data(self):
×
352
    periodicity = app.iniconfig.getint('askomics', 'anonymous_query_cleanup', fallback=60)
×
353
    handler = ResultsHandler(app, {})
×
354
    # Cleanup jobs older than 'periodicity'
355
    handler.delete_older_results(periodicity, "day", "0")
×
356
    # Cleanup failed jobs older than 1 hour
357
    handler.delete_older_results(1, "hour", "0", "error")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc